Skip to content
This repository was archived by the owner on Nov 30, 2023. It is now read-only.

Unexpected bytes in ByteBuffer of a TCP connection #34

@MichaelSteinecke

Description

@MichaelSteinecke

Hello,

I'm trying to utilize Helios in my application to get rid of performance limitations of the former native .Net TcpClient/UdpClient based implementation when connecting with a lot of remote devices. Setting up the TCP connection worked after some try and error.

I've wrote a custom MessageDecoder implementation to reflect our protocol. however, at some point i always get malformed/unexpected bytes in the stream.
Sometimes the data looks like the devices UDP discovery broadcast. Sometimes it could be DHCP discover broadcast messages. But how could they end up in the TCP stream?

I've seen in the LengthFieldFrameBasedDecoder there is a mechanism for discarding too long frames. Is tat intended to fix the behavior, I've discovered?

Edit:
I'm using Helios 1.4.0 from NuGet

Edit 2 relevant code:

public class MessageDecoder : MessageDecoderBase
{
    readonly int headerLength = Marshal.SizeOf(typeof(BaseMessage));
    public MessageDecoder()
    { }

    public override void Decode(Helios.Net.IConnection connection, Helios.Buffers.IByteBuf buffer, out List<Helios.Buffers.IByteBuf> decoded)
    {
        decoded = new List<Helios.Buffers.IByteBuf>();
        byte[] bytesHeader = new byte[headerLength];
        BaseMessage header = StructInitializer<BaseMessage>.ByteArrayToStruct(bytesHeader);

        if (buffer.ReadableBytes >= headerLength)
        {
            buffer.GetBytes(0, bytesHeader, 0, headerLength);
            header = StructInitializer<BaseMessage>.ByteArrayToStruct(bytesHeader);
        }
        else
        {
            return;
        }

        // header is valid?
        if (header.IsValid)
        {
            if (buffer.ReadableBytes >= (int)(headerLength + header.DataLength))
            {
                var message = buffer.ReadBytes((int)(headerLength + header.DataLength));
                decoded.Add(message);
            }
        else
        {
            // If this happens, the connections is essentially broken
            // It seems as no more data is passed to the decoder, however the connection is still alive and data gets transmitted
            buffer.SkipBytes(buffer.ReadableBytes);
        }
    }

    public override IMessageDecoder Clone()
    {
        return new MessageDecoder();
    }
}


public static class ConnectionManager
{
    static ClientBootstrap _heliosBootstrapper;
    static IConnectionFactory _tcpConnectionFactory;

    static ServerBootstrap _heliosBroadcastBootstrapper;
    static IConnectionFactory _udpConnectionFactory;

    static ConnectionManager()
    {       
        _heliosBootstrapper = new ClientBootstrap();
        _tcpConnectionFactory = _heliosBootstrapper
            .WorkerThreads(1)
            .SetOption("connectTimeout", TimeSpan.FromMilliseconds(ConnectTimeout))
            .SetTransport(TransportType.Tcp)
            .SetEncoder(new NoOpEncoder())
            .SetDecoder(new MessageDecoder())
            .Build();

        _heliosBroadcastBootstrapper = new ServerBootstrap();
        _udpConnectionFactory = _heliosBootstrapper
            .WorkerThreads(1)
            .SetTransport(TransportType.Udp)
            .SetEncoder(new NoOpEncoder())
            .SetDecoder(new NoOpDecoder())              
            .Build();
    }

    public static void EnableDeviceConnections()
    {
        // Enabling broadcast from devices. Unconnected devices will be connected afterwards by ConnectDevice() in the BroadcastRecieved event handler
        var node = NodeBuilder.BuildNode().WithPort(UdpBroadcastPort).Host(IPAddress.Any);
        var local = NodeBuilder.BuildNode().Host(IPAddress.Any).WithPort(UdpBroadcastPort);
        //NormalConnectionBuilder builder = new NormalConnectionBuilder();
        var broadreciever = _udpConnectionFactory.NewConnection(local, node);
        broadreciever.Receive += broadreciever_Receive;
        broadreciever.OnConnection += broadreciever_OnConnection;
        broadreciever.OnError += broadreciever_OnError;
        broadreciever.OnDisconnection += broadreciever_OnDisconnection;
        broadreciever.Open();
        broadreciever.BeginReceive();
    }

    static void broadreciever_OnDisconnection(Helios.Exceptions.HeliosConnectionException reason, IConnection closedChannel)
    {

    }

    static void broadreciever_OnError(Exception ex, IConnection connection)
    {

    }

    static void broadreciever_OnConnection(INode remoteAddress, IConnection responseChannel)
    {
    }

    static void broadreciever_Receive(NetworkData incomingData, IConnection responseChannel)
    {
        var broadcast = StructInitializer<BroadcastStruct>.ByteArrayToStruct(incomingData.Buffer);
        if(BroadcastRecieved != null)
            BroadcastRecieved(broadcast);
    }

    public static event EventHandler<BroadcastStruct> SiplugBroadcastRecieved;

    public void ConnectDevice(IPAddress device)
    {
        var node = Helios.Topology.NodeBuilder.BuildNode();
        node.Host = device;
        node.Port = TcpDataPort;
        var connection = _tcpConnectionFactory.NewConnection(node);
        connection.Receive += connection_Receive;
        connection.OnError += connection_OnError;
        connection.OnDisconnection += connection_OnDisconnection;
        connection.OnConnection += connection_OnConnection;
        var connected = await connection.OpenAsync();
    }
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions