-
Notifications
You must be signed in to change notification settings - Fork 122
Unexpected bytes in ByteBuffer of a TCP connection #34
Description
Hello,
I'm trying to utilize Helios in my application to get rid of performance limitations of the former native .Net TcpClient/UdpClient based implementation when connecting with a lot of remote devices. Setting up the TCP connection worked after some try and error.
I've wrote a custom MessageDecoder implementation to reflect our protocol. however, at some point i always get malformed/unexpected bytes in the stream.
Sometimes the data looks like the devices UDP discovery broadcast. Sometimes it could be DHCP discover broadcast messages. But how could they end up in the TCP stream?
I've seen in the LengthFieldFrameBasedDecoder there is a mechanism for discarding too long frames. Is tat intended to fix the behavior, I've discovered?
Edit:
I'm using Helios 1.4.0 from NuGet
Edit 2 relevant code:
public class MessageDecoder : MessageDecoderBase
{
readonly int headerLength = Marshal.SizeOf(typeof(BaseMessage));
public MessageDecoder()
{ }
public override void Decode(Helios.Net.IConnection connection, Helios.Buffers.IByteBuf buffer, out List<Helios.Buffers.IByteBuf> decoded)
{
decoded = new List<Helios.Buffers.IByteBuf>();
byte[] bytesHeader = new byte[headerLength];
BaseMessage header = StructInitializer<BaseMessage>.ByteArrayToStruct(bytesHeader);
if (buffer.ReadableBytes >= headerLength)
{
buffer.GetBytes(0, bytesHeader, 0, headerLength);
header = StructInitializer<BaseMessage>.ByteArrayToStruct(bytesHeader);
}
else
{
return;
}
// header is valid?
if (header.IsValid)
{
if (buffer.ReadableBytes >= (int)(headerLength + header.DataLength))
{
var message = buffer.ReadBytes((int)(headerLength + header.DataLength));
decoded.Add(message);
}
else
{
// If this happens, the connections is essentially broken
// It seems as no more data is passed to the decoder, however the connection is still alive and data gets transmitted
buffer.SkipBytes(buffer.ReadableBytes);
}
}
public override IMessageDecoder Clone()
{
return new MessageDecoder();
}
}
public static class ConnectionManager
{
static ClientBootstrap _heliosBootstrapper;
static IConnectionFactory _tcpConnectionFactory;
static ServerBootstrap _heliosBroadcastBootstrapper;
static IConnectionFactory _udpConnectionFactory;
static ConnectionManager()
{
_heliosBootstrapper = new ClientBootstrap();
_tcpConnectionFactory = _heliosBootstrapper
.WorkerThreads(1)
.SetOption("connectTimeout", TimeSpan.FromMilliseconds(ConnectTimeout))
.SetTransport(TransportType.Tcp)
.SetEncoder(new NoOpEncoder())
.SetDecoder(new MessageDecoder())
.Build();
_heliosBroadcastBootstrapper = new ServerBootstrap();
_udpConnectionFactory = _heliosBootstrapper
.WorkerThreads(1)
.SetTransport(TransportType.Udp)
.SetEncoder(new NoOpEncoder())
.SetDecoder(new NoOpDecoder())
.Build();
}
public static void EnableDeviceConnections()
{
// Enabling broadcast from devices. Unconnected devices will be connected afterwards by ConnectDevice() in the BroadcastRecieved event handler
var node = NodeBuilder.BuildNode().WithPort(UdpBroadcastPort).Host(IPAddress.Any);
var local = NodeBuilder.BuildNode().Host(IPAddress.Any).WithPort(UdpBroadcastPort);
//NormalConnectionBuilder builder = new NormalConnectionBuilder();
var broadreciever = _udpConnectionFactory.NewConnection(local, node);
broadreciever.Receive += broadreciever_Receive;
broadreciever.OnConnection += broadreciever_OnConnection;
broadreciever.OnError += broadreciever_OnError;
broadreciever.OnDisconnection += broadreciever_OnDisconnection;
broadreciever.Open();
broadreciever.BeginReceive();
}
static void broadreciever_OnDisconnection(Helios.Exceptions.HeliosConnectionException reason, IConnection closedChannel)
{
}
static void broadreciever_OnError(Exception ex, IConnection connection)
{
}
static void broadreciever_OnConnection(INode remoteAddress, IConnection responseChannel)
{
}
static void broadreciever_Receive(NetworkData incomingData, IConnection responseChannel)
{
var broadcast = StructInitializer<BroadcastStruct>.ByteArrayToStruct(incomingData.Buffer);
if(BroadcastRecieved != null)
BroadcastRecieved(broadcast);
}
public static event EventHandler<BroadcastStruct> SiplugBroadcastRecieved;
public void ConnectDevice(IPAddress device)
{
var node = Helios.Topology.NodeBuilder.BuildNode();
node.Host = device;
node.Port = TcpDataPort;
var connection = _tcpConnectionFactory.NewConnection(node);
connection.Receive += connection_Receive;
connection.OnError += connection_OnError;
connection.OnDisconnection += connection_OnDisconnection;
connection.OnConnection += connection_OnConnection;
var connected = await connection.OpenAsync();
}
}