読者です 読者をやめる 読者になる 読者になる

「Reactive Extensionsでセンサプログラミング」の補足

先日書いたReactive Extensionsでセンサプログラミング - ZOETROPEの日記に、twitter経由で@さんからいくつかコメントをいただきました。
さらに、neue cc - RxとパフォーマンスとユニットテストとMoles再びという素敵な記事も!

というわけで、補足記事を書いておきます。

ToList

改行コードがくるまでのデータをリストにまとめる処理をAggregateを使って以下のように書いていました。

sensor.TakeWhile(x => x != '\n')
      .Aggregate(new List<int>(), (l, i) => { l.Add(i); return l; }));

これは、ToListやToArrayで置き換え可能です。

sensor.TakeWhile(x => x != '\n').ToList();

ToListのほうが可読性も高くていいですね。(Windows Phone 7版のReactive Extensionsでは使えないそうですが。)

イベントの解除漏れ

ObservableSerialPortでは、EOFを受信したときやエラーが発生したときにOnCompletedやOnErrorを呼び出していますが、その後にイベントの登録解除をしていないため、イベントの解除漏れが発生してしまいます。

以下のようにCompositeDisposableを使って、明示的に登録解除する必要があります。

public IDisposable Subscribe(IObserver<byte> observer)
{
    if (observer == null) throw new ArgumentNullException("observer");

    var disposable = new CompositeDisposable();

    // 受信イベントが発生したときの処理
    var received = _serialPort.DataReceivedAsObservable()
        .Subscribe(e => {
            if (e.EventType == SerialData.Eof)
            {
                observer.OnCompleted();
                disposable.Dispose();
            }
            else
            {
                var buf = new byte[_serialPort.BytesToRead];
                var len = _serialPort.Read(buf, 0, buf.Length);

                // 受信したデータを1バイトずつObserverに通知する
                Observable.Range(0, len).ForEach(i => observer.OnNext(buf[i]));
            }
        });

    // エラーイベントが発生したときの処理
    var error = _serialPort.ErrorReceivedAsObservable()
        .Subscribe(e => {
            observer.OnError(new Exception(e.EventType.ToString()));
            disposable.Dispose();
        });

    disposable.Add(received);
    disposable.Add(error);

    // Disposeが呼ばれたらイベント登録を解除する
    return disposable;
}

neueccさんの記事ではさらにスマートな実装になっています。
TakeWhile(e => e.EventType != SerialData.Eof)でEOFがきた時点でOnCompletedが呼ばれるようにしてあり、さらにDataReceivedとErrorReceivedをTakeUntilで結合することで、エラーが発生したらDataReceivedも解除されるようになっています。
こういう実装が思いつけるようになりたい・・・

TakeUntilの挙動については、Reactive Extensions再入門 その8「SkipとTakeメソッド」 - かずきのBlog@hatenaが分かりやすいです。

拡張メソッド

ObservableSerialPortのようにIObservableを実装したクラスを用意するか、neueccさんのように拡張メソッドを用意するか、ちょっと悩むところです。

ただ、今回の場合はSerialPortのOpenやSendをObservableSerialPortに移譲させるのも面倒なので、拡張メソッドのほうがいいんじゃないかと思います。

Moles

実センサを持っていたとしても、センサを準備するのが面倒だったり、わざとエラーを起こすことは難しかったりするので、モックを使ったテストはとても有効です。

neueccさんのコードをベースに具体的なデータを受信するテストを書いてみました。

Rx-Testingの使い方は、Rx-Testingの使い方 - ZOETROPEの日記をみて思い出したり。

[TestClass]
public class ObservableSerialPortTest : ReactiveTest
{
    [TestMethod, HostType("Moles")]
    public void データが受信できることを確認()
    {
        // EventArgsを捏造
        var chars = new MSerialDataReceivedEventArgs() {EventTypeGet = () => SerialData.Chars};

        // SerialPort::BytesToRead/SerialPort::Readで、計測データを返すようにする。
        MSerialPort.AllInstances.BytesToReadGet = (self) => _data.Length;
        MSerialPort.AllInstances.ReadByteArrayInt32Int32 = (self, buffer, offset, count) => {
            var len = _data.Length < count ? _data.Length : count;
            Array.Copy(_data, 0, buffer, offset, len);
            return len;
        };

        // SerialPort::Openでは何もしない
        MSerialPort.AllInstances.Open = self => { };

        var scheduler = new TestScheduler();
        var observer = scheduler.CreateObserver<byte[]>();

        // 時間10で受信イベントを発行
        MSerialPortExtensions.DataReceivedAsObservableSerialPort = _ => scheduler.CreateHotObservable(
            OnNext(10, chars))
            .Select(x => (SerialDataReceivedEventArgs)x);

        var serialPort = new ObservableSerialPort("COM1");
        serialPort.Subscribe(observer);

        // 時間を10進める
        scheduler.AdvanceTo(10);

        observer.Messages[0].Time.Is(10);
        observer.Messages[0].Value.Value.Is(_data);
    }

    private readonly byte[] _data = Encoding.ASCII.GetBytes(
        @"GD0044072501" + "\n" +
        @"00P" + "\n" +
        @"0DKO>" + "\n" +
        @"00i00i00i00i00k00k00n01101101101101101101100o00m00o00o0130130140]" + "\n" +
        @"14012012014015017017017016017017016016015015015014014014014015010" + "\n" +
        @"501801<01<01?01D01D01D01F01F01L01O01R01T01V01W01X01X01X01Z01Z01Ze" + "\n" +
        @"01\01b01j02;02`09H09H09Z09Z09_0:90:90:@0:@0:@0:;0:@0:;0:;0:90:90]" + "\n" +
        @"9Z08X08408408408608608608408408408408908908908908808608308008008V" + "\n" +
        @"007m07m07j07h07h07h07d06E04D04>04=04=04>04C04H04H04I04J04K04U04Ue" + "\n" +
        @"04X04X04X04W04W04W04W04[04]04_04`04`04h04l04l04n05005005305;05>0N" + "\n" +
        @"5D05F05J05M05Q05T05W05[05]05^05`05f05f05m05n065065065068065065060" + "\n" +
        @"906:06:06;06<06>06A06L06L06N06S06T06d07S07[09D0hH0hH0hH0gO0fk0fDV" + "\n" +
        @"0eg0eU0e@0db0db0db0000000000000000000000000a40`N0_o0_`0_G0_=0^a0^" + "\n" +
        @"^I0^<0]h0]W0]@0]00\X0\L0[l0[f0[S0[?0[00Zi0ZJ0ZC0Z70Z70Z70Z90Z90Z2" + "\n" +
        @"90Z40Z00XR0XR0XR0XR0XR0XO0XD0W]0VT0VT0VM0V;0Um0Uc0U]0UQ0UJ0UC0U9Y" + "\n" +
        @"0Te0Tc0T^0TK0T=0T70T60Sm0Sf0Sf0SR0SO0SD0S?0S70Rn0Rh0Rh0Rd0R]0RK0Y" + "\n" +
        @"RD0RD0R70R60R20Qo0Qb0Q^0Q\0QV0QL0QI0QH0QC0Q50Q40Q30Po0Pk0Pi0Pg0PD" + "\n" +
        @"a0P[0PR0PR0PQ0PI0PI0PG0PB0PB0P@0P?0P:0P90P00P00Oh0Od0Oc0Oc0O`0O_[" + "\n" +
        @"0O]0O]0OZ0OZ0OZ0OZ0OZ0O[0O[0Og0PO0PO0PO0PL0PL0PI0P90P90O_0OP0OP0k" + "\n" +
        @"OP0Od0P50P50P>0PG0PG0PC0PC0PC0Oa0OH0OH0OH0OJ0OK0OL0OK0OK0OL0OP0O1" + "\n" +
        @"Q0OQ0OQ0OR0OR0OT0OT0OU0OZ0OZ0O[0O[0O\0O]0Oc0Oc0Oc0Od0On0On0Oo0OoY" + "\n" +
        @"0P40P40P80P=0PC0PE0PE0PE0PN0PN0PP0PX0P`0Pb0Pg0Ph0Pm0Q90Q90Q90Q?0?" + "\n" +
        @"QC0QF0QI0QM0Q[0Qa0Qc0Qi0R20R20R=0RA0RG0RO0RR0RX0R]0Rj0S10S20S90ST" + "\n" +
        @"@0SJ0SP0SS0Sa0Sk0T80T:0T>0TI0TN0T]0Ta0Tl0U40U;0UN0UR0UV0Ul0V20V?5" + "\n" +
        @"0VC0VQ0Va0W30W50WH0Xg0Xn0Xn0Xm0Xm0Xm0Xm0Z30Z<0Zb0Zb0Zb0ZW0ZW0ZW0E" + "\n" +
        @"ZX0[20[50[S0[a0\;0\G0\V0\c0]=0]T0]a0^00^E0^[0^k0_J0_Y0`30`E0`Y0`2" + "\n" +
        @"g0aE0aW0al0bK0b\0c10cH0ck0d;0dS0dg0eF0ek0fE0f_0g?0g]0h;0iV0j`0jaW" + "\n" +
        @"0jc0lY0l]0la0le0m>0mn0n[0oQ10110i11512W0000070000000000000000000L" + "\n" +
        @"000000071?d1?d1Af1Af1B800000000000000000000000000000000000000000j" + "\n" +
        @"00000000000000000000000000000000000000000000000000000000000000000" + "\n" +
        @"00000000000000005@05905905905805304m04N03P03F03@02n03202i02b02Y0U" + "\n" +
        @"2:02101h01h01h01d01m01n01o02002002001i01d01d01d01l01l01l01l01l01m" + "\n" +
        @"l01o01o01o01o02102102102101k01k01k01k01k01h01_01S01Q01P01P01P01PW" + "\n" +
        @"01O01Q01O01N01N01N01N01M01M01G01I01H01G01H01H01G01E01A01>01=01=0J" + "\n" +
        @"1=01:0180170170160170180180180190190190190170140140140140140139" + "\n\n");
}

任意のタイミングでイベントを発行できるので素敵ですね。実センサでは発生しにくいパターンのテストも簡単に書けそうです。最初からこうやってテストを書いておけばよかったと反省・・・

Molesは以前使ったときに、テストの実行が非常に重い印象があったのですが、今回はそうでもありませんでした。以前使ったときのPCのスペックが低かっただけのようです。

ちなみに、ReSharper6.0のテストランナーだとMolesのテストが動かないバグがありました。6.1にアップグレードすればOKです。