Skip to content

signature-opensource/CK-CommChannel

Repository files navigation

CK-CommChannel

Provides

  • A CommunicationChannel abstraction using System.IO.Pipelines types around input/output byte streams to streamline (no pun intended) code reuse between different means of communication (eg. a network stream from a TcpClient/TcpListener or a stream from a SerialPort), or even use an in-memory stream (for eg. tests or monitoring),
  • CommunicationChannel implementations for TCP, SerialPort, and in-memory pipes or NetworkStreams,
  • MessageReader/MessageWriter/MessageHandler types that can be plugged on a channel's Reader/Writer to parse and write messages, and provide features such as automatic time-out, message reconstruction and cancellation.

A CommunicationChannel is a high level API that exposes stable StablePipeReader Reader { get; } and a StablePipeWriter Writer { get; } properties.

"Stable" means that a CommunicationChannel instance can be temporarily disconnected, reconnected or completely reconfigured under the hood without dropping/disposing any object.

Its implementation relies on StablePipeReader and Writer.

To read incoming messages, a MessageReaders should be used on a PipeReader and to write outgoing messages, MessageWriters should be used on a PipeWriter.

Both of them correctly handle timeouts and cancellations and minimize allocations as much as possible.

MessageReader offers a pull model (messages are read one after the other). A generic push adapter is available thanks to MessageHandler.

Getting started

NuGet packages

The CK.CommChannel package provides both abstractions (ie. ICommunicationChannel, CommunicationChannel), and implementations for TCP channels (TcpChannel) and in-memory channels (MemoryChannel)

Install-Package CK.CommChannel

The implementation for serial ports is provided by CK.CommChannel.Serial.

Install-Package CK.CommChannel.Serial

Creating a channel

Channels are created using a configuration type, and the static method at CommunicationChannel.OpenAsync().

Connecting to a TCP Server

var monitor = new ActivityMonitor();
var config = new TcpChannelConfiguration()
{
    Host = "127.0.0.1", // Destination hostname or IP address
    Port = 2101, // Destination TCP port
    AutoReconnect = true
};
using ICommunicationChannel channel = await CommunicationChannel.OpenAsync( monitor, config );

Connecting to a serial port (needs CK.CommChannel.Serial package)

var config = new SerialChannelConfiguration()
{
    PortName = "COM4", // The destination COM port. COM# on Windows, or eg. /dev/ttyUSB0 on Linux.
    BaudRate = 9600,
    Parity = Parity.None,
    AutoReconnect = true
};
using ICommunicationChannel channel = await CommunicationChannel.OpenAsync( monitor, config );

Creating an in-memory channel

// The IPipeChannel has a PipeWriter Input and a PipeReader Output that are bound to the
// future channel. This one uses the `Pipe` implementation:
IPipeChannel mem = MemoryChannel.AllocatePipeChannel( "TestMe!" );

// This one uses 4 NetworkStream and a TCPListener on a dynamic port.
// This enables to easily test behaviors (exceptions, read, flush) specific
// to network communications.
IPipeChannel viaNetwork = await MemoryChannel.AllocateNetworkStreamChannelAsync( "TestMe!" );

var config = new MemoryChannelConfiguration() { EndPointName = "TestMe!" };
ICommunicationChannel channel = await CommunicationChannel.OpenAsync( monitor, config );

channel.Dispose();
// Once done, the memory channel must be released.
await MemoryChannel.DeallocateAsync( "Test" );

IPipeChannel exposes a PipeReader and PipeWriter. This is not really easy to work with them. Thanks to a bool Revert on the MemoryChannelConfiguration, there is an easy way to establish a dialog between 2 parties as they can be "symmetric", both working with a CommunicationChannel.

await MemoryChannel.AllocateNetworkStreamChannelAsync( "Test" );

MemoryChannelConfiguration config1 = new MemoryChannelConfiguration
{
  EndPointName = "Test",
  AutoReconnect = true
};
var channel1 = await CommunicationChannel.OpenAsync( TestHelper.Monitor, config1 );

MemoryChannelConfiguration config2 = new MemoryChannelConfiguration
{
  EndPointName = "Test",
  AutoReconnect = true,
  Reverted = true
};
var channel2 = await CommunicationChannel.OpenAsync( TestHelper.Monitor, config2 );

// The 2 CommunicationChannels can then write/read bytes (or better: messages) to/from each other.

channel1.Dispose();
channel2.Dispose();
await MemoryChannel.DeallocateAsync( "Test" );

Sending and receiving data

All channel types expose System.IO.Pipelines types in the Reader and Writer properties, which are respectively used to receive and send data.

// Receiving data
ReadResult r = await channel.Reader.ReadAsync();
Console.WriteLine( $"Received data: {r.Buffer.Length} bytes" );
channel.Reader.AdvanceTo( r.Buffer.End );

// Sending data
byte[] helloBytes = Encoding.ASCII.GetBytes("Hello");
await channel.Writer.WriteAsync( helloBytes );

Careful with System.IO.Pipelines

For more information about how to use PipeReader and PipeWriter, read the docs!

When using PipeReader and PipeWriter, including as a consumer of this library, you should know the common pitfalls of PipeReader caused by problematic code, including:

  • Data loss
  • Infinite loops
  • Unresponsive application
  • Out of Memory (OOM)
  • Memory corruption

You should also know the common pitfalls of PipeWriter, and general tips when using them.

Always use MessageReader/MessageWriter/MessageHandler that are safe and efficient. If you think that you need to use the PipeReader/Writer directly, please ask us before!

Reconfiguring a channel

A channel can be reconfigured to change its properties, and can even change its communication type.

var tcpConfig = new TcpChannelConfiguration() { Host = "localhost", Port = 2101 };
using ICommunicationChannel channel = await CommunicationChannel.OpenAsync( monitor, tcpConfig );

tcpConfig.Port = 8080;
await channel.ReconfigureAsync( monitor, tcpConfig );

var serialConfig = new SerialChannelConfiguration() { PortName = "COM3" };
await channel.ReconfigureAsync( monitor, serialConfig );

var memoryConfig = new MemoryChannelConfiguration();
await channel.ReconfigureAsync( monitor, memoryConfig );

Note that channels do not expose their underlying type or configuration and this is intended.

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Packages

No packages published

Contributors 3

  •  
  •  
  •  

Languages