「Reactive Extensionsでセンサプログラミング」の補足
先日書いたReactive Extensionsでセンサプログラミング - ZOETROPEの日記に、twitter経由で@neueccさんからいくつかコメントをいただきました。
さらに、neue cc - RxとパフォーマンスとユニットテストとMoles再びという素敵な記事も!
というわけで、補足記事を書いておきます。
ToList
改行コードがくるまでのデータをリストにまとめる処理をAggregateを使って以下のように書いていました。
sensor.TakeWhile(x => x != '\n') .Aggregate(new List<int>(), (l, i) => { l.Add(i); return l; }));
これは、ToListやToArrayで置き換え可能です。
sensor.TakeWhile(x => x != '\n').ToList();
ToListのほうが可読性も高くていいですね。(Windows Phone 7版のReactive Extensionsでは使えないそうですが。)
イベントの解除漏れ
ObservableSerialPortでは、EOFを受信したときやエラーが発生したときにOnCompletedやOnErrorを呼び出していますが、その後にイベントの登録解除をしていないため、イベントの解除漏れが発生してしまいます。
以下のようにCompositeDisposableを使って、明示的に登録解除する必要があります。
public IDisposable Subscribe(IObserver<byte> observer) { if (observer == null) throw new ArgumentNullException("observer"); var disposable = new CompositeDisposable(); // 受信イベントが発生したときの処理 var received = _serialPort.DataReceivedAsObservable() .Subscribe(e => { if (e.EventType == SerialData.Eof) { observer.OnCompleted(); disposable.Dispose(); } else { var buf = new byte[_serialPort.BytesToRead]; var len = _serialPort.Read(buf, 0, buf.Length); // 受信したデータを1バイトずつObserverに通知する Observable.Range(0, len).ForEach(i => observer.OnNext(buf[i])); } }); // エラーイベントが発生したときの処理 var error = _serialPort.ErrorReceivedAsObservable() .Subscribe(e => { observer.OnError(new Exception(e.EventType.ToString())); disposable.Dispose(); }); disposable.Add(received); disposable.Add(error); // Disposeが呼ばれたらイベント登録を解除する return disposable; }
neueccさんの記事ではさらにスマートな実装になっています。
TakeWhile(e => e.EventType != SerialData.Eof)でEOFがきた時点でOnCompletedが呼ばれるようにしてあり、さらにDataReceivedとErrorReceivedをTakeUntilで結合することで、エラーが発生したらDataReceivedも解除されるようになっています。
こういう実装が思いつけるようになりたい・・・
TakeUntilの挙動については、Reactive Extensions再入門 その8「SkipとTakeメソッド」 - かずきのBlog@hatenaが分かりやすいです。
拡張メソッド
ObservableSerialPortのようにIObservable
ただ、今回の場合はSerialPortのOpenやSendをObservableSerialPortに移譲させるのも面倒なので、拡張メソッドのほうがいいんじゃないかと思います。
Moles
実センサを持っていたとしても、センサを準備するのが面倒だったり、わざとエラーを起こすことは難しかったりするので、モックを使ったテストはとても有効です。
neueccさんのコードをベースに具体的なデータを受信するテストを書いてみました。
Rx-Testingの使い方は、Rx-Testingの使い方 - ZOETROPEの日記をみて思い出したり。
[TestClass] public class ObservableSerialPortTest : ReactiveTest { [TestMethod, HostType("Moles")] public void データが受信できることを確認() { // EventArgsを捏造 var chars = new MSerialDataReceivedEventArgs() {EventTypeGet = () => SerialData.Chars}; // SerialPort::BytesToRead/SerialPort::Readで、計測データを返すようにする。 MSerialPort.AllInstances.BytesToReadGet = (self) => _data.Length; MSerialPort.AllInstances.ReadByteArrayInt32Int32 = (self, buffer, offset, count) => { var len = _data.Length < count ? _data.Length : count; Array.Copy(_data, 0, buffer, offset, len); return len; }; // SerialPort::Openでは何もしない MSerialPort.AllInstances.Open = self => { }; var scheduler = new TestScheduler(); var observer = scheduler.CreateObserver<byte[]>(); // 時間10で受信イベントを発行 MSerialPortExtensions.DataReceivedAsObservableSerialPort = _ => scheduler.CreateHotObservable( OnNext(10, chars)) .Select(x => (SerialDataReceivedEventArgs)x); var serialPort = new ObservableSerialPort("COM1"); serialPort.Subscribe(observer); // 時間を10進める scheduler.AdvanceTo(10); observer.Messages[0].Time.Is(10); observer.Messages[0].Value.Value.Is(_data); } private readonly byte[] _data = Encoding.ASCII.GetBytes( @"GD0044072501" + "\n" + @"00P" + "\n" + @"0DKO>" + "\n" + @"00i00i00i00i00k00k00n01101101101101101101100o00m00o00o0130130140]" + "\n" + @"14012012014015017017017016017017016016015015015014014014014015010" + "\n" + @"501801<01<01?01D01D01D01F01F01L01O01R01T01V01W01X01X01X01Z01Z01Ze" + "\n" + @"01\01b01j02;02`09H09H09Z09Z09_0:90:90:@0:@0:@0:;0:@0:;0:;0:90:90]" + "\n" + @"9Z08X08408408408608608608408408408408908908908908808608308008008V" + "\n" + @"007m07m07j07h07h07h07d06E04D04>04=04=04>04C04H04H04I04J04K04U04Ue" + "\n" + @"04X04X04X04W04W04W04W04[04]04_04`04`04h04l04l04n05005005305;05>0N" + "\n" + @"5D05F05J05M05Q05T05W05[05]05^05`05f05f05m05n065065065068065065060" + "\n" + @"906:06:06;06<06>06A06L06L06N06S06T06d07S07[09D0hH0hH0hH0gO0fk0fDV" + "\n" + @"0eg0eU0e@0db0db0db0000000000000000000000000a40`N0_o0_`0_G0_=0^a0^" + "\n" + @"^I0^<0]h0]W0]@0]00\X0\L0[l0[f0[S0[?0[00Zi0ZJ0ZC0Z70Z70Z70Z90Z90Z2" + "\n" + @"90Z40Z00XR0XR0XR0XR0XR0XO0XD0W]0VT0VT0VM0V;0Um0Uc0U]0UQ0UJ0UC0U9Y" + "\n" + @"0Te0Tc0T^0TK0T=0T70T60Sm0Sf0Sf0SR0SO0SD0S?0S70Rn0Rh0Rh0Rd0R]0RK0Y" + "\n" + @"RD0RD0R70R60R20Qo0Qb0Q^0Q\0QV0QL0QI0QH0QC0Q50Q40Q30Po0Pk0Pi0Pg0PD" + "\n" + @"a0P[0PR0PR0PQ0PI0PI0PG0PB0PB0P@0P?0P:0P90P00P00Oh0Od0Oc0Oc0O`0O_[" + "\n" + @"0O]0O]0OZ0OZ0OZ0OZ0OZ0O[0O[0Og0PO0PO0PO0PL0PL0PI0P90P90O_0OP0OP0k" + "\n" + @"OP0Od0P50P50P>0PG0PG0PC0PC0PC0Oa0OH0OH0OH0OJ0OK0OL0OK0OK0OL0OP0O1" + "\n" + @"Q0OQ0OQ0OR0OR0OT0OT0OU0OZ0OZ0O[0O[0O\0O]0Oc0Oc0Oc0Od0On0On0Oo0OoY" + "\n" + @"0P40P40P80P=0PC0PE0PE0PE0PN0PN0PP0PX0P`0Pb0Pg0Ph0Pm0Q90Q90Q90Q?0?" + "\n" + @"QC0QF0QI0QM0Q[0Qa0Qc0Qi0R20R20R=0RA0RG0RO0RR0RX0R]0Rj0S10S20S90ST" + "\n" + @"@0SJ0SP0SS0Sa0Sk0T80T:0T>0TI0TN0T]0Ta0Tl0U40U;0UN0UR0UV0Ul0V20V?5" + "\n" + @"0VC0VQ0Va0W30W50WH0Xg0Xn0Xn0Xm0Xm0Xm0Xm0Z30Z<0Zb0Zb0Zb0ZW0ZW0ZW0E" + "\n" + @"ZX0[20[50[S0[a0\;0\G0\V0\c0]=0]T0]a0^00^E0^[0^k0_J0_Y0`30`E0`Y0`2" + "\n" + @"g0aE0aW0al0bK0b\0c10cH0ck0d;0dS0dg0eF0ek0fE0f_0g?0g]0h;0iV0j`0jaW" + "\n" + @"0jc0lY0l]0la0le0m>0mn0n[0oQ10110i11512W0000070000000000000000000L" + "\n" + @"000000071?d1?d1Af1Af1B800000000000000000000000000000000000000000j" + "\n" + @"00000000000000000000000000000000000000000000000000000000000000000" + "\n" + @"00000000000000005@05905905905805304m04N03P03F03@02n03202i02b02Y0U" + "\n" + @"2:02101h01h01h01d01m01n01o02002002001i01d01d01d01l01l01l01l01l01m" + "\n" + @"l01o01o01o01o02102102102101k01k01k01k01k01h01_01S01Q01P01P01P01PW" + "\n" + @"01O01Q01O01N01N01N01N01M01M01G01I01H01G01H01H01G01E01A01>01=01=0J" + "\n" + @"1=01:0180170170160170180180180190190190190170140140140140140139" + "\n\n"); }
任意のタイミングでイベントを発行できるので素敵ですね。実センサでは発生しにくいパターンのテストも簡単に書けそうです。最初からこうやってテストを書いておけばよかったと反省・・・
Molesは以前使ったときに、テストの実行が非常に重い印象があったのですが、今回はそうでもありませんでした。以前使ったときのPCのスペックが低かっただけのようです。
ちなみに、ReSharper6.0のテストランナーだとMolesのテストが動かないバグがありました。6.1にアップグレードすればOKです。