読者です 読者をやめる 読者になる 読者になる

Reactive Extensionsでセンサプログラミング

本エントリはC# Advent Calendar 2011 : ATNDの20日目です*1。前日はid:masakitkさんのWindows.FormsだってTDD!でした。

これまでReactive Extensionsがロボットの制御に使えるのではないかということで、いくつかエントリを書いてきましたが、今回はより具体的なセンサを使ったプログラミングについて考えてみます。

なお、本エントリではNuGetから以下のパッケージを参照に追加して使っています。

  • Rx_Experimental-Main 1.1.11111
  • Ix_Experimental-Main 1.1.10823
  • ReactiveProperty for Rx Experimental 0.3.2

2011/12/23追記:いくつかコメントをいただいたので補足記事書きました→「Reactive Extensionsでセンサプログラミング」の補足 - ZOETROPEの日記

URGセンサ

今回は、北陽電機のURGシリーズのセンサを対象にしたいと思います。

URGシリーズは、スキャナ式レンジセンサ(測域センサ)と呼ばれるもので、水平方向にレーザを走査して、センサの周囲にある物体までの距離を計測するセンサです。ロボットに搭載して周囲の障害物を見つけるために使われたりしています。

PCとURGセンサをUSBケーブルで接続するとシリアルポートとして認識されるので、C#ではSerialPortクラスを使って通信することができます。

PCとURGセンサの間ではSCIPという通信プロトコルによりメッセージをやり取りします。SCIPでは基本的にPCからセンサにコマンドメッセージを送信して、リプライメッセージを受け取るものとなっています。1つのメッセージは改行コード(ラインフィード)区切りの複数行文字列で構成されており、メッセージの区切りは改行コードが2つ並びます。なお、計測開始コマンドを送信すると、その後は周期的にセンサから計測データが送信されてきます。SCIPの詳細はSCIP コマンドの解説を参考にしてください。

監視可能なSerialPortクラス

それではまず、下記のページを参考にして、監視可能なSerialPortクラス(すなわちIObservableを実装したクラス)を作ってみます。

using System;
using System.IO.Ports;
using System.Reactive.Disposables;
using System.Reactive.Linq;

namespace RxURG
{
    public class ObservableSerialPort : IObservable<byte>, IDisposable
    {
        private readonly SerialPort _serialPort;

        public ObservableSerialPort(string portName, int baudRate = 9600, Parity parity = Parity.None, int dataBits = 8,StopBits stopBits = StopBits.One)
        {
            _serialPort = new SerialPort(portName, baudRate, parity, dataBits, stopBits);
            _serialPort.Open();
        }

        public IDisposable Subscribe(IObserver<byte> observer)
        {
            if (observer == null) throw new ArgumentNullException("observer");

            // 受信イベントが発生したときの処理
            var rcvEvent = Observable.FromEventPattern<SerialDataReceivedEventHandler, SerialDataReceivedEventArgs>(
                h => h.Invoke, h => _serialPort.DataReceived += h, h => _serialPort.DataReceived -= h)
                .Subscribe(e => {
                    if (e.EventArgs.EventType == SerialData.Eof)
                    {
                        observer.OnCompleted();
                    }
                    else
                    {
                        var buf = new byte[_serialPort.BytesToRead];
                        var len = _serialPort.Read(buf, 0, buf.Length);

                        // 受信したデータを1バイトずつObserverに通知する
                        Observable.Range(0, len).ForEach(i => observer.OnNext(buf[i]));
                    }
                });


            // エラーイベントが発生したときの処理
            var errEvent = Observable.FromEventPattern<SerialErrorReceivedEventHandler, SerialErrorReceivedEventArgs>
                (h => _serialPort.ErrorReceived += h, h => _serialPort.ErrorReceived -= h)
                .Subscribe(e => observer.OnError(new Exception(e.EventArgs.EventType.ToString())));

            // Disposeが呼ばれたらイベント登録を解除する
            return Disposable.Create(() => {
                rcvEvent.Dispose();
                errEvent.Dispose();
            });
        }

        public void Send(string text)
        {
            _serialPort.Write(text);
        }

        public void Dispose()
        {
            _serialPort.Close();
        }
    }
}

SerialPortのDataReceivedイベントを、Observable.FromEventPatternを使ってRxで扱えるように変換し、受信したデータを1バイトずつObserverに通知しています。また、EOFを受信したらOnCompleted、エラーが発生したらOnErrorで通知するようにしています。

ObservableSerialPortを使うと、メッセージの受信処理は以下のように記述できます。

var sensor = new ObservableSerialPort("COM1");
Observable.Defer(
    () => sensor.TakeWhile(x => x != '\n')
        .Aggregate(new List<int>(), (l, i) => { l.Add(i); return l; }))
    .Repeat()
    .Subscribe();

SCIPでは受信メッセージが改行コードで区切られているので、TakeWhileで改行コードが現れるまでデータを取得しAggregateで連結します。TakeWhileを使うと改行コードが来た時点で購読が解除されるので、Observable.DeferとRepeatで受信処理を繰り返し行うようにします。

パフォーマンスの問題

ここまでのプログラムで、受信したデータが1行ごとに通知されるようになりました。

しかし、受信したバイト配列をいったん1バイトずつバラバラにして、Aggregateで再度連結するという実装になっているため、パフォーマンスは非常に悪そうです。

URGセンサでは1回の受信データは最大3kB強になるので、3kBのデータを1000回処理したときにかかる時間を計測してみました。

  • 単純なループで処理した場合: 約70msec(1メッセージ70μsec)
  • 1バイトずつ通知しAggregateで連結した場合: 約7000msec(1メッセージ7msec)

100倍の性能差がありますね*2

URGセンサでは最大25msec周期でデータが送られてくるので、データを結合するだけで7msecもかかっているようではちょっと使い物になりません。

パフォーマンスアップ

というわけで、1バイトずつ通知を行うのではなく、1行ごとに通知を送るようにしてみましょう。

using System;
using System.Collections.Generic;
using System.IO.Ports;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Text;

namespace RxURGViewer
{
    class ObservableSerialPort : IObservable<string>, IDisposable
    {
        private readonly SerialPort _serialPort;

        public ObservableSerialPort(string portName, int baudRate = 9600, Parity parity = Parity.None, int dataBits = 8,StopBits stopBits = StopBits.One)
        {
            _serialPort = new SerialPort(portName, baudRate, parity, dataBits, stopBits);
            _serialPort.Open();
        }

        public IDisposable Subscribe(IObserver<string> observer)
        {
            if (observer == null) throw new ArgumentNullException("observer");

            // 受信イベントが発生したときの処理
            var rcvEvent = Observable.FromEventPattern<SerialDataReceivedEventHandler, SerialDataReceivedEventArgs>(
                h => h.Invoke, h => _serialPort.DataReceived += h, h => _serialPort.DataReceived -= h)
                .Select(e =>
                {
                    if (e.EventArgs.EventType == SerialData.Eof)
                    {
                        observer.OnCompleted();
                        return string.Empty;
                    }
                    // 受信データを文字列に変換
                    var buf = new byte[_serialPort.BytesToRead];
                    _serialPort.Read(buf, 0, buf.Length);
                    return Encoding.ASCII.GetString(buf);
                })
                .Scan(Tuple.Create(new List<string>(), ""),
                      (t, s) =>
                      {
                          // 前回の残り t.Item2 と 今回の受信データ s を連結する。
                          var source = String.Concat(t.Item2, s);
                          
                          // 改行コードがついている分は Item1 に入れて、Observerに通知する。
                          // 改行コードがついていない分は Item2 に入れ、次回のデータ受信時に処理する。
                          var items = source.Split('\n');
                          return Tuple.Create(items.Take(items.Length - 1).ToList(), items.Last());
                      })
                .SelectMany(x => x.Item1) // Item1だけをObserverに通知する。
                .Subscribe(observer);

            // エラーイベントが発生したときの処理
            var errEvent = Observable.FromEventPattern<SerialErrorReceivedEventHandler, SerialErrorReceivedEventArgs>
                (h => _serialPort.ErrorReceived += h, h => _serialPort.ErrorReceived -= h)
                .Subscribe(e => observer.OnError(new Exception(e.EventArgs.EventType.ToString())));

            // Disposeが呼ばれたらイベント登録を解除する
            return Disposable.Create(() =>
            {
                rcvEvent.Dispose();
                errEvent.Dispose();
            });
        }

        public void Send(string text)
        {
            _serialPort.Write(text);
        }

        public void Dispose()
        {
            _serialPort.Close();
        }
    }
}

シリアル通信では、1回の受信データが必ず1メッセージだとは限らないので、Scanメソッドを使って前回の受信データと最新の受信データを結合するようにしています。ちょっと分かりにくい(というか気持ち悪い・・・)ですが、これでパフォーマンスが大幅に向上しました。

コマンドの送信とリプライの受信

ObservableSerialPortを使って、コマンドの送受信を行ってみましょう。

SCIPでの1メッセージは2つの改行コードで区切られているので、空文字列がくるまでTakeWhileで読み込み、Aggregaeで連結すれば、1メッセージを取得することができます*3

var sensor = new ObservableSerialPort("COM1");
var messageObserver =
    Observable.Defer(() =>
        // 空文字列が現れるまで1つのリストにまとめる
        sensor.TakeWhile(s => s != string.Empty)
            .Aggregate(new List<string>(), (l, s) =>
            {
                l.Add(s);
                return l;
            })
    ).Repeat();

これで、例えばバージョン情報を取得する処理(VVコマンド)は以下のように書けるようになります。

var vvObserver = messageObserver 
   .Where(xs => xs[0].StartsWith("VV"))
   .Take(1)
   .PublishLast();
vvObserver.Connect();

sensor.Send("VV\n");

// VVコマンドの戻り値を待つ。3秒以内にリプライが来なければTimeoutException
var version = vvObserver.Timeout(TimeSpan.FromSeconds(3)).Wait();

計測結果の描画

センサから送信されたメッセージをを取得できるようになったので、計測データをWPFアプリケーションで描画してみましょう。

RxとGUIを連携させるときは、id:neueccさんのReactiveProperty - MVVM Extensions for Rx - Homeを使うと便利です。

まずXAMLでは、Polygonを使って計測結果を多角形で描画するようにします。計測開始ボタンもつけておきましょう。

<Window x:Class="RxURGViewer.MainWindow"
        xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
        xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
        xmlns:vm="clr-namespace:RxURGViewer"
        Title="MainWindow"
        Height="600" Width="800">
    
    <Window.DataContext>
        <vm:MainWindowViewModel/>
    </Window.DataContext>
    <Window.Resources>
        <vm:PointCollectionConverter x:Key="pointCollectionConverter"/>
    </Window.Resources>

    <Canvas Background="Black">
        <Polygon Points="{Binding Points.Value,Converter={StaticResource pointCollectionConverter}}" Fill="GreenYellow"/>
        <Button Command="{Binding StartCommand}" Content="計測開始"/>
    </Canvas>
</Window>


続いてViewModelでは受信メッセージをデコードし、極座標から直交座標に変換、そして直交座標から画面上の座標に変換して描画します*4

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading;
using System.Windows;
using Codeplex.Reactive;

namespace RxURGViewer
{
    public class MainWindowViewModel
    {
        public ReactiveProperty<Point[]> Points { get; set; }
        public ReactiveCommand StartCommand { get; set; }

        public MainWindowViewModel()
        {
            var sensor = new ObservableSerialPort("COM1");

            var messageObserver =
                Observable.Defer(() =>
                    // 空文字が現れるまで1つのリストにまとめる
                    sensor.TakeWhile(s => s != string.Empty)
                        .Aggregate(new List<string>(), (l, s) =>
                        {
                            l.Add(s);
                            return l;
                        })
                ).Repeat();

            // センサから受信した計測データを座標変換して描画する
            Points = messageObserver
                    .Where(xs => xs[0].StartsWith("MD"))
                    .Select(Decode) // 受信データをデコード
                    .Select(xs => xs
                        .Select(PolarToCartesian) // 極座標から直交座標に変換
                        .Select(p => new Point(400.0 - (p.Y / 10.0), 300.0 - (p.X / 10.0))) // 描画用に座標変換
                        .ToArray())
                    .ToReactiveProperty();

            // 計測開始ボタンが押された時の動作
            StartCommand = new ReactiveCommand();
            StartCommand.Subscribe(_ => sensor.Send("MD0044072501000\n"));
        }

        /// <summary>
        /// 計測データをデコードして距離データのリストに変換する
        /// </summary>
        /// <param name="message">計測データ</param>
        /// <returns>距離データのリスト[mm]</returns>
        public IEnumerable<int> Decode(List<string> message)
        {
            // 最初の3行はエコーバック、エラーコード、タイムスタンプなので飛ばす。(本来はエラー処理します。)
            // 4行目以降は、チェックサム(最後の1文字)を取り除いて結合する。(本来はチェックサムの確認を行います)
            var data = string.Join("", message.Skip(3).Select(x => x.Remove(x.Length - 1)));

            var distance = Encoding.ASCII.GetBytes(data)
                .Buffer(3) // 3キャラエンコーディング方式なので、3つずつまとめる。
                .Where(xs=>xs.Count == 3)
                .Select(xs => (xs[0] - 0x30) * 4096 + (xs[1] - 0x30) * 64 + (xs[2] - 0x30)); // 距離データに変換

            return distance;
        }

        // センサに設定可能なパラメータであるが、今回は固定値とする。
        const double StartAngle = -120.0; // センサの計測開始角度[deg]
        const double EndAngle = 120.0; // センサの計測終了角度[deg]
        const double Resolution = 240.0 / 682.0; // センサの角度分解能[deg]

        /// <summary>
        /// 極座標を直交座標に変換する
        /// </summary>
        /// <param name="r">距離[mm]</param>
        /// <param name="index">方向のインデックス</param>
        /// <returns>計測データの直交座標表現(x[mm],y[mm])</returns>
        public Point PolarToCartesian(int r, int index)
        {
            var degree = index * Resolution + StartAngle;

            var theta = degree * Math.PI / 180.0;

            var x = r * Math.Cos(theta);
            var y = r * Math.Sin(theta);

            return new Point(x, y);
        }
    }
}


あとは、PointのリストをPointCollection型に変換するConverterを用意しておけばOKです。

using System;
using System.Collections.Generic;
using System.Globalization;
using System.Windows;
using System.Windows.Data;
using System.Windows.Media;

namespace RxURGViewer
{
    /// <summary>
    /// IEnumerable{Point}型のインスタンスをPointCollection型に変換するコンバータ
    /// </summary>
    public class PointCollectionConverter : IValueConverter
    {
        public object Convert(object value, Type targetType, object parameter, CultureInfo culture)
        {
            if (value == null) return null;
            if (typeof(IEnumerable<Point>).IsAssignableFrom(value.GetType()) && targetType == typeof(PointCollection))
            {
                return new PointCollection((IEnumerable<Point>)value);
            }
            return null;
        }

        public object ConvertBack(object value, Type targetType, object parameter, CultureInfo culture)
        {
            return null;
        }
    }
}


起動して計測開始ボタンを押すと、こんな感じでセンサの計測データが表示されます。

まとめ

Reactive Extensionsを使うことによって、センサの制御がずいぶんとスッキリ書けることが少しでも伝わったでしょうか。スレッドや排他制御などがまったくでてこないのでとてもシンプルですね(違った種類の難しさはあるんですけどね・・・)。


明日のC# Advent Calendar 2011 : ATNDid:suerさんです。

*1:ちなみに今日は僕の誕生日です。ついでにちょうど今日、本ブログのPVが10万を超えました。どうでもいいですね。はい。

*2:[http://d.hatena.ne.jp/okazuki/20111209/1323446919:title]でも100倍という性能差になったそうです。

*3:1メッセージ読むたびにイベントの登録が解除されるため、データの取りこぼしの可能性があるかも?

*4:本来、MVVM的にはこういった処理はModelに入れるべきですが、今回はサンプルということで全部ViewModelに書いてます。