読者です 読者をやめる 読者になる 読者になる

FParsecでパースしてRoslynで組み立てる

F#

本エントリは、F# Advent Calendar 2011 - PARTAKEの27日目(ボーナスステージ最終日)です。前日は@さんのF#初心者による in キーワドの考察:Gushwell's C# Dev Notesでした。

F#は以前勉強してたこともあったんですが*1、最近はちょっとご無沙汰してました。

そこで「実践 F# 関数型プログラミング入門」を読んで勉強しなおしてみましたが、以前よりすんなりと頭に入ってきたように思います。やはり日本語の書籍があるというのはありがたいですね。

実践 F# 関数型プログラミング入門

実践 F# 関数型プログラミング入門

FParsec

最近は、RTStorageReactiveRTMというアプリケーションやフレームワークを作ったりしています。

これらのアプリケーションではCORBA(IIOP.NET)を使っているのですが、CORBAのコードを書いているとやっぱりIDLパーサの1つや2つ欲しくなりますよね?ね?

以前にMGrammarというツールを使ってパースしたこともあった(MGrammarでIDLをパースしてみた - ZOETROPEの日記)のですが、立ち消えになってしまったのか、MGrammarは先行きがよく分からない状況です。


というわけで、今回はFParsecを使ってみました。

まずFParsec - A Parser Combinator Library for F#からFParsec-0.9.1をダウンロードしてきてビルドします。

ところが、いきなり以下のようなメッセージが出てビルドに失敗してしまいました。

文字'・'は予期されていません。

これは、FParseCS/Strings.csにShift-JISで読み込めない文字(0x91,0x92)が含まれていることが原因のようです。

文字コードとしてはシングルクォーテーション的なものなので、シングルクォーテーションに書き換えるか、UTF8に変更して正しい文字に書き換えればよさそうです。とりあえずシングルクォーテーションに置き換えてビルドしました。

CORBA IDLをパース

ビルドしたFParsecを使って、さっそくCORBA IDLをパースしてみます。

最初は四苦八苦しましたが、なんとかパースできるようになりました。少し長いのでgistにおいて置きます。

まだconst関連が処理できなかったり、プリミティブ型と似た(longXxxxのような)名前の型名が処理できなかったりしますが、基本的な文法はだいたいパースできていると思います。

なお、FParsecの使い方に関しては下記の記事/サイトが非常に参考になりました。

FsUnitかNaturalSpecか

パーサの動作確認をするためには、テストフレームワークが必須です。

F#のテストフレームワークを調べてみると、どうやらFsUnitかNaturalSpecが有名なようです。どちらもNuGetから入れられますね。NuGetすばらしい。

今回は以下のバージョンのものを入れて比較してみました。

  • FsUnit 0.9.1
  • NaturalSpec 1.2.17.1


まずFsUnitですが、パース結果の判別共用体(AST)を比較したときにテストに失敗すると、メッセージがこんな感じになってしまいます。

Expected: <idl.ast+Definition+Interface>
But was:  <idl.ast+Definition+Interface>

これではテストに失敗しても何がおかしいのか分かりません。FsUnitはテストの実行をNUnitに丸投げしているだけなので、オブジェクトの中身までは表示してくれないんですね。

一方のNatualSpecでは「sprintf "%A"」で表示メッセージを組み立てているので、以下のように具体的な値の違いが分かります。

Elements are not equal.
Expected:Interface ("Test",[],[],null)
But was: Interface ("Hoge",[],[],null)

これでテストも捗りますね。今回はNatualSpecを使うことにしました。


しかし、NaturalSpecをNuGetで入れた直後は、以下のようなエラーが発生してビルドできませんでした。

FSC: エラー FS0219: 参照された、または既定の基本 CLI ライブラリ 'mscorlib' は、参照された F# コア ライブラリ 'packages\NaturalSpec.1.2.17.1\lib\FSharp.Core.dll' とバイナリ非互換です。ライブラリを再コンパイルするか、使用している CLI バージョンと一致する、このライブラリのバージョンへの明示的な参照を作成してください。
FSC: エラー FS0218: アセンブリ 'packages\NaturalSpec.1.2.17.1\lib\FSharp.Core.dll' を読み取れません

とりあえず、FSharp.Core.dllとFSharp.PowerPack.dllを参照から削除すれば動きましたが、ローカル環境と異なるバージョンのアセンブリが含まれてるんですかねー?

Roslyn

パースができてしまえば、あとはT4でソースコード生成するなり、CodeDOMでアセンブリを組み立てるなり自由自在です。

今回はせっかくなので、Roslynを使ってみましょう。なお、RoslynはまだCTP版なので未実装な機能がたくさんありますし、今後仕様が変わる可能性もあるのでご注意を。

Roslynも以下のバージョンのものがNuGetで入れられます。NuGet便利すぎ。

  • Roslyn 1.0.11014.5


以下はあまり面白い例ではないですが、CORBA IDLの文法で書いたstructから、自動実装プロパティを持つC#のクラスのソースコードとアセンブリを生成します。

module sample

open System
open System.IO
open System.Collections.Generic

open idl.parser
open idl.ast

open FParsec.Primitives
open FParsec.CharParsers
open FParsec.Error

open Roslyn.Compilers;
open Roslyn.Compilers.CSharp;


let convertPrimitive x =
    match x with
    | Primitive "short" -> SyntaxKind.ShortKeyword
    | Primitive "long" -> SyntaxKind.InKeyword
    | Primitive "double" -> SyntaxKind.DoubleKeyword
    | Primitive "float" -> SyntaxKind.FloatKeyword
    | String _ -> SyntaxKind.StringKeyword


let createProperty t name =
    Syntax.PropertyDeclaration(
        Unchecked.defaultof<SyntaxList<AttributeDeclarationSyntax>>,
        Syntax.TokenList(Syntax.Token(SyntaxKind.PublicKeyword)),
        Syntax.PredefinedType(Syntax.Token(convertPrimitive t)), // 名前付き引数で指定したいがtypeがキーワードなのでだめ。
        null,
        identifier = Syntax.Identifier((fun x -> match x with | SimpleDec i -> i) name),
        accessorList = Syntax.AccessorList(
            accessors = Syntax.List(
                Syntax.AccessorDeclaration(
                    kind = SyntaxKind.GetAccessorDeclaration,
                    semicolonTokenOpt = Syntax.Token(SyntaxKind.SemicolonToken)
                ),
                Syntax.AccessorDeclaration(
                    kind = SyntaxKind.SetAccessorDeclaration,
                    semicolonTokenOpt = Syntax.Token(SyntaxKind.SemicolonToken)
                )
            )
        )
    )

let createClass name members = 
    let props = 
        Seq.map (fun m -> match m with | Member (t,name) -> createProperty t name.Head) members
        |> Seq.cast<'MemberDeclarationSyntax>

    Syntax.TypeDeclaration(
        SyntaxKind.ClassDeclaration, 
        modifiers = Syntax.TokenList(Syntax.Token(SyntaxKind.PublicKeyword)),
        identifier = Syntax.Identifier(name),
        members = Syntax.List(props)
    )

let createCompilationUnit (expList : Definition list) =
    let types =
        Seq.map (fun c -> match c with | Struct (name, members) -> createClass name members) expList
        |> Seq.cast<'MemberDeclarationSyntax>
    
    Syntax.CompilationUnit(
        usings= Syntax.List(Syntax.UsingDirective(name= Syntax.ParseName("System"))),
        members= Syntax.List(types)
    )

let showSource (unit : CompilationUnitSyntax) = 
    unit |> SyntaxExtensions.Format
         |> printfn "%A"
    unit

let createAssembly (unit : CompilationUnitSyntax) = 
    let compilation = 
        Compilation.Create(
            "test.dll", 
            options = CompilationOptions(assemblyKind =  AssemblyKind.DynamicallyLinkedLibrary),
            syntaxTrees = [SyntaxTree.Create("test.cs", unit) ],
            references = [AssemblyFileReference(typeof<Object>.Assembly.Location)]
        )
        
    using (new FileStream("test.dll", FileMode.Create))( 
        fun file -> compilation.Emit(file))
        |> ignore


[<EntryPoint>]
let main(argv: string[]) =
    let input = "struct Test { string message; };"

    let ret = (run specification) <| input
    match ret with
        | Success(r, _, _) -> r
        | Failure (msg, err, _) -> failwith msg
    |> createCompilationUnit
    |> showSource
    |> createAssembly

    0

実行結果は以下のように表示されます。このコードをコンパイルしたtest.dllも生成されています。

using System;

public class Test
{
    public string message
    {
        get;
        set;
    }
}

まとめ

以上、CORBA IDLをFParsecでパースして、RoslynでSyntaxTreeを組み立てて、ソースコードやアセンブリを出力してみたという紹介記事でした。
RoslynはC#VBのパーサはあるものの独自パーサを持っていないので、こういう使い方もありなのかもしれません。


さて、F# Advent Calendar 2011はこれでおしまいですが、この1ヶ月間はいろんな視点から書かれたF#の記事を読むことができて非常に楽しかったです。
参加された皆様、お疲れさまでした!

*1:http://d.hatena.ne.jp/ZOETROPE/20080601#1212317876

「Reactive Extensionsでセンサプログラミング」の補足

C# Rx

先日書いたReactive Extensionsでセンサプログラミング - ZOETROPEの日記に、twitter経由で@さんからいくつかコメントをいただきました。
さらに、neue cc - RxとパフォーマンスとユニットテストとMoles再びという素敵な記事も!

というわけで、補足記事を書いておきます。

ToList

改行コードがくるまでのデータをリストにまとめる処理をAggregateを使って以下のように書いていました。

sensor.TakeWhile(x => x != '\n')
      .Aggregate(new List<int>(), (l, i) => { l.Add(i); return l; }));

これは、ToListやToArrayで置き換え可能です。

sensor.TakeWhile(x => x != '\n').ToList();

ToListのほうが可読性も高くていいですね。(Windows Phone 7版のReactive Extensionsでは使えないそうですが。)

イベントの解除漏れ

ObservableSerialPortでは、EOFを受信したときやエラーが発生したときにOnCompletedやOnErrorを呼び出していますが、その後にイベントの登録解除をしていないため、イベントの解除漏れが発生してしまいます。

以下のようにCompositeDisposableを使って、明示的に登録解除する必要があります。

public IDisposable Subscribe(IObserver<byte> observer)
{
    if (observer == null) throw new ArgumentNullException("observer");

    var disposable = new CompositeDisposable();

    // 受信イベントが発生したときの処理
    var received = _serialPort.DataReceivedAsObservable()
        .Subscribe(e => {
            if (e.EventType == SerialData.Eof)
            {
                observer.OnCompleted();
                disposable.Dispose();
            }
            else
            {
                var buf = new byte[_serialPort.BytesToRead];
                var len = _serialPort.Read(buf, 0, buf.Length);

                // 受信したデータを1バイトずつObserverに通知する
                Observable.Range(0, len).ForEach(i => observer.OnNext(buf[i]));
            }
        });

    // エラーイベントが発生したときの処理
    var error = _serialPort.ErrorReceivedAsObservable()
        .Subscribe(e => {
            observer.OnError(new Exception(e.EventType.ToString()));
            disposable.Dispose();
        });

    disposable.Add(received);
    disposable.Add(error);

    // Disposeが呼ばれたらイベント登録を解除する
    return disposable;
}

neueccさんの記事ではさらにスマートな実装になっています。
TakeWhile(e => e.EventType != SerialData.Eof)でEOFがきた時点でOnCompletedが呼ばれるようにしてあり、さらにDataReceivedとErrorReceivedをTakeUntilで結合することで、エラーが発生したらDataReceivedも解除されるようになっています。
こういう実装が思いつけるようになりたい・・・

TakeUntilの挙動については、Reactive Extensions再入門 その8「SkipとTakeメソッド」 - かずきのBlog@hatenaが分かりやすいです。

拡張メソッド

ObservableSerialPortのようにIObservableを実装したクラスを用意するか、neueccさんのように拡張メソッドを用意するか、ちょっと悩むところです。

ただ、今回の場合はSerialPortのOpenやSendをObservableSerialPortに移譲させるのも面倒なので、拡張メソッドのほうがいいんじゃないかと思います。

Moles

実センサを持っていたとしても、センサを準備するのが面倒だったり、わざとエラーを起こすことは難しかったりするので、モックを使ったテストはとても有効です。

neueccさんのコードをベースに具体的なデータを受信するテストを書いてみました。

Rx-Testingの使い方は、Rx-Testingの使い方 - ZOETROPEの日記をみて思い出したり。

[TestClass]
public class ObservableSerialPortTest : ReactiveTest
{
    [TestMethod, HostType("Moles")]
    public void データが受信できることを確認()
    {
        // EventArgsを捏造
        var chars = new MSerialDataReceivedEventArgs() {EventTypeGet = () => SerialData.Chars};

        // SerialPort::BytesToRead/SerialPort::Readで、計測データを返すようにする。
        MSerialPort.AllInstances.BytesToReadGet = (self) => _data.Length;
        MSerialPort.AllInstances.ReadByteArrayInt32Int32 = (self, buffer, offset, count) => {
            var len = _data.Length < count ? _data.Length : count;
            Array.Copy(_data, 0, buffer, offset, len);
            return len;
        };

        // SerialPort::Openでは何もしない
        MSerialPort.AllInstances.Open = self => { };

        var scheduler = new TestScheduler();
        var observer = scheduler.CreateObserver<byte[]>();

        // 時間10で受信イベントを発行
        MSerialPortExtensions.DataReceivedAsObservableSerialPort = _ => scheduler.CreateHotObservable(
            OnNext(10, chars))
            .Select(x => (SerialDataReceivedEventArgs)x);

        var serialPort = new ObservableSerialPort("COM1");
        serialPort.Subscribe(observer);

        // 時間を10進める
        scheduler.AdvanceTo(10);

        observer.Messages[0].Time.Is(10);
        observer.Messages[0].Value.Value.Is(_data);
    }

    private readonly byte[] _data = Encoding.ASCII.GetBytes(
        @"GD0044072501" + "\n" +
        @"00P" + "\n" +
        @"0DKO>" + "\n" +
        @"00i00i00i00i00k00k00n01101101101101101101100o00m00o00o0130130140]" + "\n" +
        @"14012012014015017017017016017017016016015015015014014014014015010" + "\n" +
        @"501801<01<01?01D01D01D01F01F01L01O01R01T01V01W01X01X01X01Z01Z01Ze" + "\n" +
        @"01\01b01j02;02`09H09H09Z09Z09_0:90:90:@0:@0:@0:;0:@0:;0:;0:90:90]" + "\n" +
        @"9Z08X08408408408608608608408408408408908908908908808608308008008V" + "\n" +
        @"007m07m07j07h07h07h07d06E04D04>04=04=04>04C04H04H04I04J04K04U04Ue" + "\n" +
        @"04X04X04X04W04W04W04W04[04]04_04`04`04h04l04l04n05005005305;05>0N" + "\n" +
        @"5D05F05J05M05Q05T05W05[05]05^05`05f05f05m05n065065065068065065060" + "\n" +
        @"906:06:06;06<06>06A06L06L06N06S06T06d07S07[09D0hH0hH0hH0gO0fk0fDV" + "\n" +
        @"0eg0eU0e@0db0db0db0000000000000000000000000a40`N0_o0_`0_G0_=0^a0^" + "\n" +
        @"^I0^<0]h0]W0]@0]00\X0\L0[l0[f0[S0[?0[00Zi0ZJ0ZC0Z70Z70Z70Z90Z90Z2" + "\n" +
        @"90Z40Z00XR0XR0XR0XR0XR0XO0XD0W]0VT0VT0VM0V;0Um0Uc0U]0UQ0UJ0UC0U9Y" + "\n" +
        @"0Te0Tc0T^0TK0T=0T70T60Sm0Sf0Sf0SR0SO0SD0S?0S70Rn0Rh0Rh0Rd0R]0RK0Y" + "\n" +
        @"RD0RD0R70R60R20Qo0Qb0Q^0Q\0QV0QL0QI0QH0QC0Q50Q40Q30Po0Pk0Pi0Pg0PD" + "\n" +
        @"a0P[0PR0PR0PQ0PI0PI0PG0PB0PB0P@0P?0P:0P90P00P00Oh0Od0Oc0Oc0O`0O_[" + "\n" +
        @"0O]0O]0OZ0OZ0OZ0OZ0OZ0O[0O[0Og0PO0PO0PO0PL0PL0PI0P90P90O_0OP0OP0k" + "\n" +
        @"OP0Od0P50P50P>0PG0PG0PC0PC0PC0Oa0OH0OH0OH0OJ0OK0OL0OK0OK0OL0OP0O1" + "\n" +
        @"Q0OQ0OQ0OR0OR0OT0OT0OU0OZ0OZ0O[0O[0O\0O]0Oc0Oc0Oc0Od0On0On0Oo0OoY" + "\n" +
        @"0P40P40P80P=0PC0PE0PE0PE0PN0PN0PP0PX0P`0Pb0Pg0Ph0Pm0Q90Q90Q90Q?0?" + "\n" +
        @"QC0QF0QI0QM0Q[0Qa0Qc0Qi0R20R20R=0RA0RG0RO0RR0RX0R]0Rj0S10S20S90ST" + "\n" +
        @"@0SJ0SP0SS0Sa0Sk0T80T:0T>0TI0TN0T]0Ta0Tl0U40U;0UN0UR0UV0Ul0V20V?5" + "\n" +
        @"0VC0VQ0Va0W30W50WH0Xg0Xn0Xn0Xm0Xm0Xm0Xm0Z30Z<0Zb0Zb0Zb0ZW0ZW0ZW0E" + "\n" +
        @"ZX0[20[50[S0[a0\;0\G0\V0\c0]=0]T0]a0^00^E0^[0^k0_J0_Y0`30`E0`Y0`2" + "\n" +
        @"g0aE0aW0al0bK0b\0c10cH0ck0d;0dS0dg0eF0ek0fE0f_0g?0g]0h;0iV0j`0jaW" + "\n" +
        @"0jc0lY0l]0la0le0m>0mn0n[0oQ10110i11512W0000070000000000000000000L" + "\n" +
        @"000000071?d1?d1Af1Af1B800000000000000000000000000000000000000000j" + "\n" +
        @"00000000000000000000000000000000000000000000000000000000000000000" + "\n" +
        @"00000000000000005@05905905905805304m04N03P03F03@02n03202i02b02Y0U" + "\n" +
        @"2:02101h01h01h01d01m01n01o02002002001i01d01d01d01l01l01l01l01l01m" + "\n" +
        @"l01o01o01o01o02102102102101k01k01k01k01k01h01_01S01Q01P01P01P01PW" + "\n" +
        @"01O01Q01O01N01N01N01N01M01M01G01I01H01G01H01H01G01E01A01>01=01=0J" + "\n" +
        @"1=01:0180170170160170180180180190190190190170140140140140140139" + "\n\n");
}

任意のタイミングでイベントを発行できるので素敵ですね。実センサでは発生しにくいパターンのテストも簡単に書けそうです。最初からこうやってテストを書いておけばよかったと反省・・・

Molesは以前使ったときに、テストの実行が非常に重い印象があったのですが、今回はそうでもありませんでした。以前使ったときのPCのスペックが低かっただけのようです。

ちなみに、ReSharper6.0のテストランナーだとMolesのテストが動かないバグがありました。6.1にアップグレードすればOKです。

Reactive Extensionsでセンサプログラミング

C# Rx

本エントリはC# Advent Calendar 2011 : ATNDの20日目です*1。前日はid:masakitkさんのWindows.FormsだってTDD!でした。

これまでReactive Extensionsがロボットの制御に使えるのではないかということで、いくつかエントリを書いてきましたが、今回はより具体的なセンサを使ったプログラミングについて考えてみます。

なお、本エントリではNuGetから以下のパッケージを参照に追加して使っています。

  • Rx_Experimental-Main 1.1.11111
  • Ix_Experimental-Main 1.1.10823
  • ReactiveProperty for Rx Experimental 0.3.2

2011/12/23追記:いくつかコメントをいただいたので補足記事書きました→「Reactive Extensionsでセンサプログラミング」の補足 - ZOETROPEの日記

URGセンサ

今回は、北陽電機のURGシリーズのセンサを対象にしたいと思います。

URGシリーズは、スキャナ式レンジセンサ(測域センサ)と呼ばれるもので、水平方向にレーザを走査して、センサの周囲にある物体までの距離を計測するセンサです。ロボットに搭載して周囲の障害物を見つけるために使われたりしています。

PCとURGセンサをUSBケーブルで接続するとシリアルポートとして認識されるので、C#ではSerialPortクラスを使って通信することができます。

PCとURGセンサの間ではSCIPという通信プロトコルによりメッセージをやり取りします。SCIPでは基本的にPCからセンサにコマンドメッセージを送信して、リプライメッセージを受け取るものとなっています。1つのメッセージは改行コード(ラインフィード)区切りの複数行文字列で構成されており、メッセージの区切りは改行コードが2つ並びます。なお、計測開始コマンドを送信すると、その後は周期的にセンサから計測データが送信されてきます。SCIPの詳細はSCIP コマンドの解説を参考にしてください。

監視可能なSerialPortクラス

それではまず、下記のページを参考にして、監視可能なSerialPortクラス(すなわちIObservableを実装したクラス)を作ってみます。

using System;
using System.IO.Ports;
using System.Reactive.Disposables;
using System.Reactive.Linq;

namespace RxURG
{
    public class ObservableSerialPort : IObservable<byte>, IDisposable
    {
        private readonly SerialPort _serialPort;

        public ObservableSerialPort(string portName, int baudRate = 9600, Parity parity = Parity.None, int dataBits = 8,StopBits stopBits = StopBits.One)
        {
            _serialPort = new SerialPort(portName, baudRate, parity, dataBits, stopBits);
            _serialPort.Open();
        }

        public IDisposable Subscribe(IObserver<byte> observer)
        {
            if (observer == null) throw new ArgumentNullException("observer");

            // 受信イベントが発生したときの処理
            var rcvEvent = Observable.FromEventPattern<SerialDataReceivedEventHandler, SerialDataReceivedEventArgs>(
                h => h.Invoke, h => _serialPort.DataReceived += h, h => _serialPort.DataReceived -= h)
                .Subscribe(e => {
                    if (e.EventArgs.EventType == SerialData.Eof)
                    {
                        observer.OnCompleted();
                    }
                    else
                    {
                        var buf = new byte[_serialPort.BytesToRead];
                        var len = _serialPort.Read(buf, 0, buf.Length);

                        // 受信したデータを1バイトずつObserverに通知する
                        Observable.Range(0, len).ForEach(i => observer.OnNext(buf[i]));
                    }
                });


            // エラーイベントが発生したときの処理
            var errEvent = Observable.FromEventPattern<SerialErrorReceivedEventHandler, SerialErrorReceivedEventArgs>
                (h => _serialPort.ErrorReceived += h, h => _serialPort.ErrorReceived -= h)
                .Subscribe(e => observer.OnError(new Exception(e.EventArgs.EventType.ToString())));

            // Disposeが呼ばれたらイベント登録を解除する
            return Disposable.Create(() => {
                rcvEvent.Dispose();
                errEvent.Dispose();
            });
        }

        public void Send(string text)
        {
            _serialPort.Write(text);
        }

        public void Dispose()
        {
            _serialPort.Close();
        }
    }
}

SerialPortのDataReceivedイベントを、Observable.FromEventPatternを使ってRxで扱えるように変換し、受信したデータを1バイトずつObserverに通知しています。また、EOFを受信したらOnCompleted、エラーが発生したらOnErrorで通知するようにしています。

ObservableSerialPortを使うと、メッセージの受信処理は以下のように記述できます。

var sensor = new ObservableSerialPort("COM1");
Observable.Defer(
    () => sensor.TakeWhile(x => x != '\n')
        .Aggregate(new List<int>(), (l, i) => { l.Add(i); return l; }))
    .Repeat()
    .Subscribe();

SCIPでは受信メッセージが改行コードで区切られているので、TakeWhileで改行コードが現れるまでデータを取得しAggregateで連結します。TakeWhileを使うと改行コードが来た時点で購読が解除されるので、Observable.DeferとRepeatで受信処理を繰り返し行うようにします。

パフォーマンスの問題

ここまでのプログラムで、受信したデータが1行ごとに通知されるようになりました。

しかし、受信したバイト配列をいったん1バイトずつバラバラにして、Aggregateで再度連結するという実装になっているため、パフォーマンスは非常に悪そうです。

URGセンサでは1回の受信データは最大3kB強になるので、3kBのデータを1000回処理したときにかかる時間を計測してみました。

  • 単純なループで処理した場合: 約70msec(1メッセージ70μsec)
  • 1バイトずつ通知しAggregateで連結した場合: 約7000msec(1メッセージ7msec)

100倍の性能差がありますね*2

URGセンサでは最大25msec周期でデータが送られてくるので、データを結合するだけで7msecもかかっているようではちょっと使い物になりません。

パフォーマンスアップ

というわけで、1バイトずつ通知を行うのではなく、1行ごとに通知を送るようにしてみましょう。

using System;
using System.Collections.Generic;
using System.IO.Ports;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Text;

namespace RxURGViewer
{
    class ObservableSerialPort : IObservable<string>, IDisposable
    {
        private readonly SerialPort _serialPort;

        public ObservableSerialPort(string portName, int baudRate = 9600, Parity parity = Parity.None, int dataBits = 8,StopBits stopBits = StopBits.One)
        {
            _serialPort = new SerialPort(portName, baudRate, parity, dataBits, stopBits);
            _serialPort.Open();
        }

        public IDisposable Subscribe(IObserver<string> observer)
        {
            if (observer == null) throw new ArgumentNullException("observer");

            // 受信イベントが発生したときの処理
            var rcvEvent = Observable.FromEventPattern<SerialDataReceivedEventHandler, SerialDataReceivedEventArgs>(
                h => h.Invoke, h => _serialPort.DataReceived += h, h => _serialPort.DataReceived -= h)
                .Select(e =>
                {
                    if (e.EventArgs.EventType == SerialData.Eof)
                    {
                        observer.OnCompleted();
                        return string.Empty;
                    }
                    // 受信データを文字列に変換
                    var buf = new byte[_serialPort.BytesToRead];
                    _serialPort.Read(buf, 0, buf.Length);
                    return Encoding.ASCII.GetString(buf);
                })
                .Scan(Tuple.Create(new List<string>(), ""),
                      (t, s) =>
                      {
                          // 前回の残り t.Item2 と 今回の受信データ s を連結する。
                          var source = String.Concat(t.Item2, s);
                          
                          // 改行コードがついている分は Item1 に入れて、Observerに通知する。
                          // 改行コードがついていない分は Item2 に入れ、次回のデータ受信時に処理する。
                          var items = source.Split('\n');
                          return Tuple.Create(items.Take(items.Length - 1).ToList(), items.Last());
                      })
                .SelectMany(x => x.Item1) // Item1だけをObserverに通知する。
                .Subscribe(observer);

            // エラーイベントが発生したときの処理
            var errEvent = Observable.FromEventPattern<SerialErrorReceivedEventHandler, SerialErrorReceivedEventArgs>
                (h => _serialPort.ErrorReceived += h, h => _serialPort.ErrorReceived -= h)
                .Subscribe(e => observer.OnError(new Exception(e.EventArgs.EventType.ToString())));

            // Disposeが呼ばれたらイベント登録を解除する
            return Disposable.Create(() =>
            {
                rcvEvent.Dispose();
                errEvent.Dispose();
            });
        }

        public void Send(string text)
        {
            _serialPort.Write(text);
        }

        public void Dispose()
        {
            _serialPort.Close();
        }
    }
}

シリアル通信では、1回の受信データが必ず1メッセージだとは限らないので、Scanメソッドを使って前回の受信データと最新の受信データを結合するようにしています。ちょっと分かりにくい(というか気持ち悪い・・・)ですが、これでパフォーマンスが大幅に向上しました。

コマンドの送信とリプライの受信

ObservableSerialPortを使って、コマンドの送受信を行ってみましょう。

SCIPでの1メッセージは2つの改行コードで区切られているので、空文字列がくるまでTakeWhileで読み込み、Aggregaeで連結すれば、1メッセージを取得することができます*3

var sensor = new ObservableSerialPort("COM1");
var messageObserver =
    Observable.Defer(() =>
        // 空文字列が現れるまで1つのリストにまとめる
        sensor.TakeWhile(s => s != string.Empty)
            .Aggregate(new List<string>(), (l, s) =>
            {
                l.Add(s);
                return l;
            })
    ).Repeat();

これで、例えばバージョン情報を取得する処理(VVコマンド)は以下のように書けるようになります。

var vvObserver = messageObserver 
   .Where(xs => xs[0].StartsWith("VV"))
   .Take(1)
   .PublishLast();
vvObserver.Connect();

sensor.Send("VV\n");

// VVコマンドの戻り値を待つ。3秒以内にリプライが来なければTimeoutException
var version = vvObserver.Timeout(TimeSpan.FromSeconds(3)).Wait();

計測結果の描画

センサから送信されたメッセージをを取得できるようになったので、計測データをWPFアプリケーションで描画してみましょう。

RxとGUIを連携させるときは、id:neueccさんのReactiveProperty - MVVM Extensions for Rx - Homeを使うと便利です。

まずXAMLでは、Polygonを使って計測結果を多角形で描画するようにします。計測開始ボタンもつけておきましょう。

<Window x:Class="RxURGViewer.MainWindow"
        xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
        xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
        xmlns:vm="clr-namespace:RxURGViewer"
        Title="MainWindow"
        Height="600" Width="800">
    
    <Window.DataContext>
        <vm:MainWindowViewModel/>
    </Window.DataContext>
    <Window.Resources>
        <vm:PointCollectionConverter x:Key="pointCollectionConverter"/>
    </Window.Resources>

    <Canvas Background="Black">
        <Polygon Points="{Binding Points.Value,Converter={StaticResource pointCollectionConverter}}" Fill="GreenYellow"/>
        <Button Command="{Binding StartCommand}" Content="計測開始"/>
    </Canvas>
</Window>


続いてViewModelでは受信メッセージをデコードし、極座標から直交座標に変換、そして直交座標から画面上の座標に変換して描画します*4

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading;
using System.Windows;
using Codeplex.Reactive;

namespace RxURGViewer
{
    public class MainWindowViewModel
    {
        public ReactiveProperty<Point[]> Points { get; set; }
        public ReactiveCommand StartCommand { get; set; }

        public MainWindowViewModel()
        {
            var sensor = new ObservableSerialPort("COM1");

            var messageObserver =
                Observable.Defer(() =>
                    // 空文字が現れるまで1つのリストにまとめる
                    sensor.TakeWhile(s => s != string.Empty)
                        .Aggregate(new List<string>(), (l, s) =>
                        {
                            l.Add(s);
                            return l;
                        })
                ).Repeat();

            // センサから受信した計測データを座標変換して描画する
            Points = messageObserver
                    .Where(xs => xs[0].StartsWith("MD"))
                    .Select(Decode) // 受信データをデコード
                    .Select(xs => xs
                        .Select(PolarToCartesian) // 極座標から直交座標に変換
                        .Select(p => new Point(400.0 - (p.Y / 10.0), 300.0 - (p.X / 10.0))) // 描画用に座標変換
                        .ToArray())
                    .ToReactiveProperty();

            // 計測開始ボタンが押された時の動作
            StartCommand = new ReactiveCommand();
            StartCommand.Subscribe(_ => sensor.Send("MD0044072501000\n"));
        }

        /// <summary>
        /// 計測データをデコードして距離データのリストに変換する
        /// </summary>
        /// <param name="message">計測データ</param>
        /// <returns>距離データのリスト[mm]</returns>
        public IEnumerable<int> Decode(List<string> message)
        {
            // 最初の3行はエコーバック、エラーコード、タイムスタンプなので飛ばす。(本来はエラー処理します。)
            // 4行目以降は、チェックサム(最後の1文字)を取り除いて結合する。(本来はチェックサムの確認を行います)
            var data = string.Join("", message.Skip(3).Select(x => x.Remove(x.Length - 1)));

            var distance = Encoding.ASCII.GetBytes(data)
                .Buffer(3) // 3キャラエンコーディング方式なので、3つずつまとめる。
                .Where(xs=>xs.Count == 3)
                .Select(xs => (xs[0] - 0x30) * 4096 + (xs[1] - 0x30) * 64 + (xs[2] - 0x30)); // 距離データに変換

            return distance;
        }

        // センサに設定可能なパラメータであるが、今回は固定値とする。
        const double StartAngle = -120.0; // センサの計測開始角度[deg]
        const double EndAngle = 120.0; // センサの計測終了角度[deg]
        const double Resolution = 240.0 / 682.0; // センサの角度分解能[deg]

        /// <summary>
        /// 極座標を直交座標に変換する
        /// </summary>
        /// <param name="r">距離[mm]</param>
        /// <param name="index">方向のインデックス</param>
        /// <returns>計測データの直交座標表現(x[mm],y[mm])</returns>
        public Point PolarToCartesian(int r, int index)
        {
            var degree = index * Resolution + StartAngle;

            var theta = degree * Math.PI / 180.0;

            var x = r * Math.Cos(theta);
            var y = r * Math.Sin(theta);

            return new Point(x, y);
        }
    }
}


あとは、PointのリストをPointCollection型に変換するConverterを用意しておけばOKです。

using System;
using System.Collections.Generic;
using System.Globalization;
using System.Windows;
using System.Windows.Data;
using System.Windows.Media;

namespace RxURGViewer
{
    /// <summary>
    /// IEnumerable{Point}型のインスタンスをPointCollection型に変換するコンバータ
    /// </summary>
    public class PointCollectionConverter : IValueConverter
    {
        public object Convert(object value, Type targetType, object parameter, CultureInfo culture)
        {
            if (value == null) return null;
            if (typeof(IEnumerable<Point>).IsAssignableFrom(value.GetType()) && targetType == typeof(PointCollection))
            {
                return new PointCollection((IEnumerable<Point>)value);
            }
            return null;
        }

        public object ConvertBack(object value, Type targetType, object parameter, CultureInfo culture)
        {
            return null;
        }
    }
}


起動して計測開始ボタンを押すと、こんな感じでセンサの計測データが表示されます。

まとめ

Reactive Extensionsを使うことによって、センサの制御がずいぶんとスッキリ書けることが少しでも伝わったでしょうか。スレッドや排他制御などがまったくでてこないのでとてもシンプルですね(違った種類の難しさはあるんですけどね・・・)。


明日のC# Advent Calendar 2011 : ATNDid:suerさんです。

*1:ちなみに今日は僕の誕生日です。ついでにちょうど今日、本ブログのPVが10万を超えました。どうでもいいですね。はい。

*2:[http://d.hatena.ne.jp/okazuki/20111209/1323446919:title]でも100倍という性能差になったそうです。

*3:1メッセージ読むたびにイベントの登録が解除されるため、データの取りこぼしの可能性があるかも?

*4:本来、MVVM的にはこういった処理はModelに入れるべきですが、今回はサンプルということで全部ViewModelに書いてます。

Reactive ExtensionsをRTミドルウェアに適用してみる

C# Rx RTM

先日のエントリーでは、Reactive Extensionsを使って、ロボットの位置を算出するプログラムを書いてみました。

しかし位置を算出するだけではあまり実用的ではありません。そこで今回は、RTミドルウェアにReactive Extensionsを適用することを考えてみます。

RTミドルウェアは、ロボットソフトウェアの機能(センサやアクチュエータやアルゴリズム)をRTコンポーネントと呼ばれる単位でモジュール化し、それらをネットワークを介して組み合わせてロボットシステムを構築するためのフレームワークです。

RTコンポーネントは、自身の状態に応じて周期的な処理を実行し、データポートやサービスポートなどの仕組みを利用して、他のRTコンポーネントと通信を行います。
RTコンポーネントでは、状態に応じた周期処理、他コンポーネントとの通信、センサの監視など、それぞれ非同期で動作する処理が多くあり、これらを組み合わせて制御するところにReactive Extensionsがマッチするのではないかと考えてみました。


というわけで、Reactive Extensionsを活用するとRTコンポーネントがこんな風に実装できるよというのを紹介します。
なお、C++で実装されたOpenRTM-aistとC#のコードを比較しているので、ちょっと分かりにくいですがご容赦を。

データポートの同期

まずデータポートの同期について考えてみます。

RTコンポーネントでは、複数のRTコンポーネントからのデータ入力を待ち受けるということがよくあります。

OpenRTM-aistでは、複数のデータ入力を待ちうける場合、公式ページにも書かれているように、on_executeのなかでisNew()でデータの有無を周期的にチェックするのが一般的でしょう。

 if (inport1.isNew() && inport2.isNew()) {
   indata1 = inport1.read();
   indata2 = inport2.read();
   
   // データを使った処理
   
   outport.write(outdata);
 }

しかしこの方法では、2つのデータが揃った瞬間ではなく、RTコンポーネントの動作周期のタイミングでしかデータが揃ったことをチェックできません。

Reactive Extensionsを使うと、2つの入力ポートのデータが揃うまで待ち、そのデータの和を出力ポートに書き込む処理を以下のように簡単に書くことができます。

    Observable.Zip(inport1, inport2, (indata1, indata2) => indata1 + indata2 )
      .Subscribe(outport);

Zip以外にも、MergeやCombineLeastなどの合成系のメソッドが使えますし、WhereでデータをフィルタリングしたりSelectで加工することもできます。

RTコンポーネントの活性化

OpenRTM-aistでは、RTコンポーネントを活性化したい場合、通常は以下のようなコードを書くと思います。

  ExecutionContext_ptr ec = comp->get_context(0);
  ec->activate_component(comp);

しかし、activate_component()メソッドはイベントを送るだけなので、実際にACTIVE状態に変わったかどうか分かりません。
そのため、以下のようにget_component_state()を何度か呼んで状態が変化するまで待つ必要があります。

  comp->activate(0);

  ExecutionContext_ptr ec = comp->get_context(0);

  while(true) {
    if( ec->get_component_state(comp) == ACTIVE_STATE ) break;
    Sleep(1);
  }

しかしこの方法では、get_component_stateで何度も通信が行われますし、スリープの時間によっては待ち時間が大きくなってしまいます。


OpenRTM-aistでは、ver.1.1からコンポーネントオブザーバという機能が実験的に追加されています。
この機能を利用すると、RTコンポーネントの状態変化などの通知を受け取ることができます。

OpenRTM-aistでコンポーネントオブザーバの機能を利用するには、rtc.confに以下の記述を追加して、RTコンポーネントを起動するだけです。(Windows版の場合)

# ComponentObserverConsumer.dllのパスを指定
manager.modules.load_path: ../ext/sdo/
manager.modules.preload:ComponentObserverConsumer.dll
sdo.service.provider.enabled_services: ALL


さて、コンポーネントオブザーバを有効にしたRTコンポーネントを、Reactive Extensionsを使って処理してみます。(少々分かりにくいですが、OpenRTM-aistのC++版のRTコンポーネントを、C#のプログラムで監視しているということです)

コンポーネントオブザーバからの状態変化通知をStateChangedAsObservableで受け取れるようにすると、RTコンポーネントを活性化して、状態がACTIVEになるまで待つという処理を以下のように書くことができます。

    // コンポーネントの状態がACTIVEになった通知をキャッシュする
    var obs = comp.StateChangedAsObservable()
      .Where(state => state == LifeCycleState.ACTIVE_STATE)
      .PublishLast();
    obs.Connect();
    
    // コンポーネントを活性化
    comp.ActivateAsync().First();
    
    // コンポーネントの状態がACTIVEになるまで待つ
    obs.First();

ここで、もしRTコンポーネントの活性化に失敗して状態が変化しなかった場合、obs.First()のところで待ち続けることになってしまいます。

Reactive Extensionsでは、以下のようにTimeoutメソッドを使ってタイムアウトを設定することができます。あとは、例外をcatchするなり、Catchメソッドで処理するなり、SubscribeのOnErrorで処理するなりしましょう。

    obs.Timeout(TimeSpan.FromSeconds(5)).First();

スレッド間同期

RTコンポーネントでは、一定の周期で動作する処理や、他のコンポーネントから非同期で呼びだされるデータポートやサービスポート、センサの監視やアクチュエータの制御などがそれぞれ別のスレッドで動作しています。

これを排他制御して動作させるのは非常に難しいのですが、Reactive ExtensionsではSchedulerという仕組みがあるので、スレッド間の同期がだいぶやりやすくなります。

例えば、以下のようにRTコンポーネントにExecutionContextSchedulerを持たせ、周期的に動作するOnExecuteと、非同期で呼び出されるデータポートの処理を同じSchedulerで動作させるようにします。
ExecutionContextSchedulerは、デフォルトでEventLoopSchedulerを使うようにしてあるので、OnExecuteとデータポートの処理が必ず1つのスレッド上で動作されるようになります。ロックを書く必要はありません。

class ECSchedulerTest : ReactiveComponent
{
    public ReactiveInPort<TimedLong> InPort { get; private set; }
    private IDisposable _disposer;

    public ECSchedulerTest()
        : base("ECSchedulerTest")
    {
        InPort = new ReactiveInPort<TimedLong>("in");
        AddPort(InPort);
    }

    protected override ReturnCode_t OnActivated(int execHandle)
    {
        _disposer = InPort
            .ObserveOn(ExecutionContextScheduler) // 以下の処理をExecutionContextSchedulerで実行するように切り替え
            .Subscribe(_ => { /* 排他制御の必要な処理 */ });
        
        return ReturnCode_t.RTC_OK;
    }

    // OnExecuteは、ExecutionContextSchedulerにより一定の周期で呼び出される
    protected override ReturnCode_t OnExecute(int execHandle)
    {
        /* 排他制御の必要な処理 */
        return ReturnCode_t.RTC_OK;
    }

    protected override ReturnCode_t OnDeactivated(int execHandle)
    {
        _disposer.Dispose();

        return ReturnCode_t.RTC_OK;
    }
}

RTコンポーネントのテスト

先日、Rx-Testingの使い方を紹介しましたが、Rx-Testingを使えばRTコンポーネントのテストも簡単に書けるようになります。

入力ポートに入れるデータは、入力するタイミングを任意に指定することができますし、OnExecuteの周期処理も実際の時間を待たなくても実行することができます。

例えば、入力ポートから入ってきたデータを2倍して出力するコンポーネントのテストは以下のようになります。

[TestClass]
public class ComponentTest : ReactiveTest
{
    [TestMethod]
    public void TargetComponentの入出力チェック()
    {
        var scheduler = new TestScheduler();

        // OutPortから出力されたデータを蓄えるためのもの。
        var recorder = scheduler.CreateObserver<TimedLong>();

        // InPortに入力するデータ。1秒ごとにデータを送る。
        var publisher = scheduler.CreateHotObservable(
            OnNext(TimeSpan.FromSeconds(1).Ticks, new TimedLong() {data = 1}),
            OnNext(TimeSpan.FromSeconds(2).Ticks, new TimedLong() {data = 2}),
            OnNext(TimeSpan.FromSeconds(3).Ticks, new TimedLong() {data = 3}),
            OnNext(TimeSpan.FromSeconds(4).Ticks, new TimedLong() {data = 4}),
            OnNext(TimeSpan.FromSeconds(5).Ticks, new TimedLong() {data = 5})
            );

        var comp = new TargetComponent();

        // コンポーネントのスケジューラをTestSchedulerに差し替える
        comp.ExecutionContextScheduler = scheduler;

        // 入出力ポートに接続
        comp.InPort.Connect(publisher);
        comp.OutPort.Connect(recorder);

        // コンポーネントを活性化
        var retRecorder = scheduler.CreateObserver<ReturnCode_t>();
        comp.ActivateAsync().Subscribe(retRecorder);
        scheduler.AdvanceBy(100);
        retRecorder.Messages.Count.Is(2);
        retRecorder.Messages.First().Value.Value.Is(ReturnCode_t.RTC_OK);

        // 時間を5秒進める
        scheduler.AdvanceBy(TimeSpan.FromSeconds(5).Ticks);

        // 入力データの値が2倍されて出力されていることを確認
        recorder.Messages.Count.Is(5);
        ReactiveAssert.AreElementsEqual(
            recorder.Messages.Select(x => x.Value.Value),
            new[] {
                    new TimedLong() {data = 2},
                    new TimedLong() {data = 4},
                    new TimedLong() {data = 6},
                    new TimedLong() {data = 8},
                    new TimedLong() {data = 10}
                });
    }
}

ReactiveRTM

ここまでに紹介した機能を持つRTミドルウェアとして、ReactiveRTMという名前でプロトタイプ実装を作ってみました。

zoetrope/ReactiveRTM · GitHub

一応、OpenRTM-aist-1.1やRTSystemEditorと連携させて動作させることはできますが、実装していない機能も多いので実用には程遠いです。(NuGetに登録もしたいのですが、あまり完成度の高くないものを登録するのもよくないかなと)

ちなみに、GUIアプリケーションとの連携サンプルや、IDLを書かずに独自のデータ型でデータポートを使ったり、C#のdynamicを使ってdRubyのようにサービスポートを呼び出すようなサンプルもあるので、興味があれば見てみてください。

そういえば、Erlang版のOpenRTMなどもありますが、コンセプトは近いのかもしれませんね。

Rx-Testingの使い方

Rx C#

Rxを使ったプログラミングは楽しいのですが、時間に依存してたり非同期だったりするのでテストを書こうと思うとなかなか難しい。

そこでReactive Extensionsでは、テスト用のクラスをいくつか用意してくれています。

テスト用のクラスを利用するためには、NuGetからRx-Testingを入れるか、インストーラな人はMicrosoft.Reactive.Testingを参照設定に追加します。

ITestableObserverとITestableObservable

Rxでプログラミングしている時のテスト対象は、何らかのIObserverやIObservableになると思います。

IObservableのテストをしたいときは、いつどんな通知が行われたかを確認したいですし、IObserverをテストしたいときは、任意のタイミングで通知できるIObservableが欲しい。

そのために、Rx-TestingではITestableObserverとITestableObservableという2種類のインタフェースと、その実装クラスであるMockObserver、HotObservable、ColdObservableというクラスを用意しています。

これらはinternalクラスなので、インスタンスはTestSchedulerから作ることになります。

例えば、MockObserverインスタンスは以下のように作ります。

var testScheduler = new TestScheduler();
var mockObserver = testScheduler.CreateObserver<int>();

IObservableのテスト

IObservableをテストしたい場合は、テスト対象のIObservableに対して、MockObserverをSubscribeします。

var testScheduler = new TestScheduler();
var mockObserver = testScheduler.CreateObserver<int>();
IObservable<int> testTarget = ・・・
testTarget.Subscribe(mockObserver);

この状態でtestTargetから通知が行われると、MockObserverのMessagesに通知結果が蓄積されます。

蓄積されるのは、Recorded>という型で、通知の時間とその値を保持しています。
また、どんな通知が行われたかは、Notificationのサブクラスの型(Notificatioを継承したOnNextNotification/OnErrorNotification/OnCompletedNotification)で判断することができます。


通知結果を確認するためには、ReactiveAssertのAreElementsEqualを使います。

しかし、期待する結果を用意するために、Recorded>のインスタンスを作るのはちょっと面倒です。
そこで、ReactiveTestクラスに、Recorded>を簡単に作るためにOnNext/OnError/OnCompletedというFactoryメソッドが用意されています。

テストクラスでReactiveTestを継承すると使いやすいですね。

というわけで、IObservableのテストは以下のように書くことができます。(テストフレームワークにはMSTestを使っています。)

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text;
using Microsoft.Reactive.Testing;
using Microsoft.VisualStudio.TestTools.UnitTesting;

[TestClass]
public class RxTest1 : ReactiveTest
{
    [TestMethod]
    public void Observableのテスト()
    {
        var testTarget = new Subject<int>(); //テストしたい対象

        var testScheduler = new TestScheduler();
        var mockObserver = testScheduler.CreateObserver<int>();
        
        // テスト対象を監視する。
        testTarget.Subscribe(mockObserver);

        // 時間を進めつつ通知を行う
        testTarget.OnNext(1);
        testScheduler.AdvanceBy(100);
        testTarget.OnNext(2);
        testScheduler.AdvanceBy(100);
        testTarget.OnNext(3);
        testScheduler.AdvanceBy(100);
        testTarget.OnCompleted();

        // 通知結果の確認
        ReactiveAssert.AreElementsEqual(
            mockObserver.Messages,
            new[] { OnNext(0, 1), OnNext(100, 2), OnNext(200, 3), OnCompleted<int>(300) });
    }
}

時間の操作

TestSchedulerでは、AdvanceByやAdvanceToメソッドを使って、自由に時間を進めることができます。

あまり現実的ではないですが、例えば以下のように1時間に一回通知を行うようなクラスがあったとしましょう。

public class PeriodicObservable : IObservable<long>
{
    private readonly IScheduler _scheduler;

    public PeriodicObservable (IScheduler scheduler)
    {
        _scheduler = scheduler;
    }

    public IDisposable Subscribe(IObserver<long> observer)
    {
        return Observable.Interval(TimeSpan.FromHours(1),_scheduler).Subscribe(observer);
    }
}

通知されるまでスリープしていては、テストに時間がかかりすぎてしまいます。

そこで、TestSchedulerを使って時間を操作してしまいましょう。すると、上記のようなクラスのテストも一瞬で実行することができます。

[TestClass]
public class RxTest2 : ReactiveTest
{
    [TestMethod]
    public void 時間がかかるObservableのテスト()
    {
        var testScheduler = new TestScheduler();
        var mockObserver = testScheduler.CreateObserver<long>();

        var testTarget = new PeriodicObservable(testScheduler);

        testTarget.Subscribe(mockObserver);

        // 時間を5時間進める
        testScheduler.AdvanceBy(TimeSpan.FromHours(5).Ticks);
        
        // 5回通知が発生したことを確認
        Assert.AreEqual(mockObserver.Messages.Count, 5);
    }
}

IObserverのテスト

一方、IObserverのテストをしたいときは、任意のタイミングで通知してくれるHotObservable、ColdObservableを使います。
それぞれTestSchedulerのCreateHotObservableとCreateColdObservableメソッドで、通知したい時間と値を指定して作ることができます。
HotとColdの違いについてはneue cc - Reactive Extensions for .NET (Rx) メソッド探訪第7回:IEnumerable vs IObservableが参考になります。

これらのクラスは、TestSchedulerの時間を進めると、指定したタイミングで通知してくれます。
当然ですが時間は進めることしかできず、過去に戻ろうとすると例外が発生します。

IObserverのテストは、こんな感じになります。

[TestClass]
public class RxTest3 : ReactiveTest
{
    [TestMethod]
    public void Observerのテスト()
    {
        var testScheduler = new TestScheduler();

        var list = new List<int>();
        var testTarget = Observer.Create<int>(x => list.Add(x));

        // 通知したい時間と値をセットしておく。
        var hotObservable = testScheduler.CreateHotObservable(OnNext(0, 4), OnNext(100, 5), OnNext(200, 6));

        var d = hotObservable.Subscribe(testTarget);

        // 時間を進めて通知を発生させる
        testScheduler.AdvanceBy(50);
        ReactiveAssert.AreElementsEqual(list, new[] { 4 });

        testScheduler.AdvanceBy(100);
        ReactiveAssert.AreElementsEqual(list, new[] { 4, 5 });

        testScheduler.AdvanceBy(100);
        ReactiveAssert.AreElementsEqual(list, new[] { 4, 5, 6 });

        d.Dispose();
    }
}

これでRx開発も捗りますね!

Reactive Extensionsでロボット制御?

C# Rx

最近、Reactive Extensions(Rx)を使ってプログラミングを書くのがすごく楽しい。

今のところRxの使い道としては、GUIアプリケーションや、Webサービスの非同期呼び出しなどが多いでしょうか。

やさしいFunctional reactive programming(概要編) - maoeのブログによると、Reactive Programmingの応用先としては、以下のようなものがあるそうです。

 * アニメーションやコンピュータミュージックのシグナル処理を自然に書ける
 * イベントドリブンなコードで処理がぶつ切りになってしまうのを避ける
   * GUIプログラミングとか
   * ロボット制御とか

ロボット制御!

そう。いろいろなセンサからの情報を非同期で受け取って、時間のかかる複雑な計算処理を周期的に実行して、アクチュエータの制御をするなんて、まさにReactive Programming向き!?

じゃあ何か制御プログラムを書いてみるか!とは言っても、ハードウェアを用意したりとかは大変・・・。
というわけで、まずは、論文読み: Arrows, Robots, and Functional Reactive Programming http://www.haskell.org/yampa/AFPLectureNotes.pdf - 言語ゲームを参考にして、左右の車輪の速度からロボットの位置を算出するプログラムをRxで書いてみました。

using System;
using System.Reactive.Linq;
using System.Reactive.Subjects;

class RxSimBot
{
    static void Main()
    {
        // 車体の幅
        const double length = 1.0;

        // 右車輪の速度を発生させるオブジェクト
        var vrSubject = new Subject<double>();
        // 左車輪の速度を発生させるオブジェクト
        var vlSubject = new Subject<double>();

        // 左車輪の速度と右車輪の速度を結合、ついでに前回との時間差も付与。
        var source = Observable.Zip(vrSubject, vlSubject, (vr, vl) => new { vr, vl })
            .TimeInterval();

        // 車体の向き theta = (1/l) * integral (vr - vl)
        var thetaObserver = source
            .Scan(0.0, (s, v) => s + (v.Value.vr - v.Value.vl) * v.Interval.TotalSeconds)
            .Select(integral => (1.0 / length) * integral);

        // 左右の車輪速度に車体の向きを結合
        var sourceWithTheta = source
            .Zip(thetaObserver, (v, theta) => new { v.Value.vr, v.Value.vl, theta, v.Interval });

        // 車体のX座標 x = (1/2) * integral ((vr + vl) * cos theta)
        var xObserver = sourceWithTheta
            .Scan(0.0, (s, v) => s + (v.vr + v.vl) * Math.Cos(v.theta) * v.Interval.TotalSeconds)
            .Select(integral => (1.0 / 2.0) * integral);

        // 車体のY座標 y = (1/2) * integral ((vr + vl) * sin theta)
        var yObserver = sourceWithTheta
            .Scan(0.0, (s, v) => s + (v.vr + v.vl) * Math.Sin(v.theta) * v.Interval.TotalSeconds)
            .Select(integral => (1.0 / 2.0) * integral);

        // x,y,thetaを結合して結果を表示する。
        xObserver.Zip(yObserver, (x, y) => new { x, y })
            .Zip(thetaObserver, (pos, theta) => new { pos.x, pos.y, theta })
            .Subscribe(Console.WriteLine);

        // 1秒周期でに左右の車輪の速度を与える
        Observable.Interval(TimeSpan.FromSeconds(1))
            .Subscribe(_ =>
            {
                // 左右車輪に同じ値を与えているためyとthetaは変化しない
                vrSubject.OnNext(1);
                vlSubject.OnNext(1);
            });

        Console.ReadKey();
    }
}

Rxを使わない場合は、センサ入力と計算処理を別スレッドに分けて、データの受け渡しを排他制御して、計算処理はfor文になったりするので、それと比べるとだんぜんシンプルですね。
なにより、積分計算がTimeIntervalとScanでサクっと書けるのに感動しました!

なおこのプログラムでは、左右の車輪の速度をSubject.OnNextで与えていますが、実際のロボットの場合はここがセンサ入力になります。

RtStorageリリース

RTM .NET

RtStorageというアプリケーションを公開しました。

zoetrope/RtStorage · GitHub

先日のエントリーでは、利用したライブラリのことを書きましたが、今回はアプリケーションの機能について紹介したいと思います。

RtStorageとは

RtStorageはRTミドルウェア(OpenRTM-aist official website | OpenRTM-aist)のためのデータ記録・再生用ツールです。以下のような特徴を持っています。

  • RTコンポーネントのOutPortから出力されたデータをファイルに記録することができます。
  • 保存したデータを、RTコンポーネントのInPortに対して再生することができます。
  • データの再生は保存したときと同じタイミングで行われます。また、任意の位置から再生を開始することができます。
  • 保存されたデータは、いくつかの検索条件で簡単に見つけ出すことができます。
  • IDLファイルを書かなくても、ユーザが独自定義したデータを扱うことができます。
  • 保存されたデータの解析を行うことができます。

RtStorageと同じような機能を持つアプリケーションとしては、rtshellのrtlogと、そのGUIフロントエンドであるrtlogplayerがあります。

想定ユースケース

RtStorageのユースケースとしては、センサコンポーネントと、センサの計測データを使って計算をするコンポーネントを開発するときを考えてみてください。

計算コンポーネントの動作確認をするために、センサを毎回動かすのはかなりめんどくさいんですよね。

ですので、センサコンポーネントのデータどりを先にしておいて、そのデータを使ってじっくりアルゴリズムの検証をするほうがだんぜん楽。

RTコンポーネントの開発を行っている人は、おそらくダミーのコンポーネントを自作して、同じようなことをしているのではないでしょうか?

RtStorageは、データポートのデータ記録とデータ再生を簡単に行うことができます。

コンセプト

RtStorageのコンセプトとしては「簡単に使えること」と「応答性の向上」を目指しています。(実現できているかどうかはともかく・・・)

インストーラを用意していますので.NET Framwork 4 Client ProfileがインストールされているPCであれば、すぐに使い始めることができます。(一応ZIPアーカイブも用意しています。)別途RTミドルウェアをインストールする必要はありません。

また、RTミドルウェアを使うときにはIDLを扱うのが結構難しいのですが、RtStorageではIDLファイルを使う必要はありません。送られてきたデータをバイナリ配列(CDR形式)のままファイルに保存してるので、データ型にまったく依存せず、ユーザー定義型のデータを扱うことができます。

応答性の向上としては、アプリケーション内でCORBA通信を行う箇所や、データベースアクセスをする箇所を非同期で動作させるようにしています。これにより「アプリケーションが固まった」という感覚を受けることなく使えるのではないかと思います。

興味のある方は、こちらからダウンロードできるので使ってみてください。

Downloads · zoetrope/RtStorage · GitHub

今後

現状でも基本的な機能は実現できているので、今後は細かな機能追加をしていこうかと思っています。
例えば、蓄積したデータの閲覧や編集機能などは欲しいと思っています。
また、自動テストに使うことを考えるとCUIで利用する機能も欲しいですかね。


RTミドルウェアコンテストにだしたら、何か賞が取れたりしたかなぁ。