Reactive ExtensionsをRTミドルウェアに適用してみる

先日のエントリーでは、Reactive Extensionsを使って、ロボットの位置を算出するプログラムを書いてみました。

しかし位置を算出するだけではあまり実用的ではありません。そこで今回は、RTミドルウェアにReactive Extensionsを適用することを考えてみます。

RTミドルウェアは、ロボットソフトウェアの機能(センサやアクチュエータやアルゴリズム)をRTコンポーネントと呼ばれる単位でモジュール化し、それらをネットワークを介して組み合わせてロボットシステムを構築するためのフレームワークです。

RTコンポーネントは、自身の状態に応じて周期的な処理を実行し、データポートやサービスポートなどの仕組みを利用して、他のRTコンポーネントと通信を行います。
RTコンポーネントでは、状態に応じた周期処理、他コンポーネントとの通信、センサの監視など、それぞれ非同期で動作する処理が多くあり、これらを組み合わせて制御するところにReactive Extensionsがマッチするのではないかと考えてみました。


というわけで、Reactive Extensionsを活用するとRTコンポーネントがこんな風に実装できるよというのを紹介します。
なお、C++で実装されたOpenRTM-aistとC#のコードを比較しているので、ちょっと分かりにくいですがご容赦を。

データポートの同期

まずデータポートの同期について考えてみます。

RTコンポーネントでは、複数のRTコンポーネントからのデータ入力を待ち受けるということがよくあります。

OpenRTM-aistでは、複数のデータ入力を待ちうける場合、公式ページにも書かれているように、on_executeのなかでisNew()でデータの有無を周期的にチェックするのが一般的でしょう。

 if (inport1.isNew() && inport2.isNew()) {
   indata1 = inport1.read();
   indata2 = inport2.read();
   
   // データを使った処理
   
   outport.write(outdata);
 }

しかしこの方法では、2つのデータが揃った瞬間ではなく、RTコンポーネントの動作周期のタイミングでしかデータが揃ったことをチェックできません。

Reactive Extensionsを使うと、2つの入力ポートのデータが揃うまで待ち、そのデータの和を出力ポートに書き込む処理を以下のように簡単に書くことができます。

    Observable.Zip(inport1, inport2, (indata1, indata2) => indata1 + indata2 )
      .Subscribe(outport);

Zip以外にも、MergeやCombineLeastなどの合成系のメソッドが使えますし、WhereでデータをフィルタリングしたりSelectで加工することもできます。

RTコンポーネントの活性化

OpenRTM-aistでは、RTコンポーネントを活性化したい場合、通常は以下のようなコードを書くと思います。

  ExecutionContext_ptr ec = comp->get_context(0);
  ec->activate_component(comp);

しかし、activate_component()メソッドはイベントを送るだけなので、実際にACTIVE状態に変わったかどうか分かりません。
そのため、以下のようにget_component_state()を何度か呼んで状態が変化するまで待つ必要があります。

  comp->activate(0);

  ExecutionContext_ptr ec = comp->get_context(0);

  while(true) {
    if( ec->get_component_state(comp) == ACTIVE_STATE ) break;
    Sleep(1);
  }

しかしこの方法では、get_component_stateで何度も通信が行われますし、スリープの時間によっては待ち時間が大きくなってしまいます。


OpenRTM-aistでは、ver.1.1からコンポーネントオブザーバという機能が実験的に追加されています。
この機能を利用すると、RTコンポーネントの状態変化などの通知を受け取ることができます。

OpenRTM-aistでコンポーネントオブザーバの機能を利用するには、rtc.confに以下の記述を追加して、RTコンポーネントを起動するだけです。(Windows版の場合)

# ComponentObserverConsumer.dllのパスを指定
manager.modules.load_path: ../ext/sdo/
manager.modules.preload:ComponentObserverConsumer.dll
sdo.service.provider.enabled_services: ALL


さて、コンポーネントオブザーバを有効にしたRTコンポーネントを、Reactive Extensionsを使って処理してみます。(少々分かりにくいですが、OpenRTM-aistのC++版のRTコンポーネントを、C#のプログラムで監視しているということです)

コンポーネントオブザーバからの状態変化通知をStateChangedAsObservableで受け取れるようにすると、RTコンポーネントを活性化して、状態がACTIVEになるまで待つという処理を以下のように書くことができます。

    // コンポーネントの状態がACTIVEになった通知をキャッシュする
    var obs = comp.StateChangedAsObservable()
      .Where(state => state == LifeCycleState.ACTIVE_STATE)
      .PublishLast();
    obs.Connect();
    
    // コンポーネントを活性化
    comp.ActivateAsync().First();
    
    // コンポーネントの状態がACTIVEになるまで待つ
    obs.First();

ここで、もしRTコンポーネントの活性化に失敗して状態が変化しなかった場合、obs.First()のところで待ち続けることになってしまいます。

Reactive Extensionsでは、以下のようにTimeoutメソッドを使ってタイムアウトを設定することができます。あとは、例外をcatchするなり、Catchメソッドで処理するなり、SubscribeのOnErrorで処理するなりしましょう。

    obs.Timeout(TimeSpan.FromSeconds(5)).First();

スレッド間同期

RTコンポーネントでは、一定の周期で動作する処理や、他のコンポーネントから非同期で呼びだされるデータポートやサービスポート、センサの監視やアクチュエータの制御などがそれぞれ別のスレッドで動作しています。

これを排他制御して動作させるのは非常に難しいのですが、Reactive ExtensionsではSchedulerという仕組みがあるので、スレッド間の同期がだいぶやりやすくなります。

例えば、以下のようにRTコンポーネントにExecutionContextSchedulerを持たせ、周期的に動作するOnExecuteと、非同期で呼び出されるデータポートの処理を同じSchedulerで動作させるようにします。
ExecutionContextSchedulerは、デフォルトでEventLoopSchedulerを使うようにしてあるので、OnExecuteとデータポートの処理が必ず1つのスレッド上で動作されるようになります。ロックを書く必要はありません。

class ECSchedulerTest : ReactiveComponent
{
    public ReactiveInPort<TimedLong> InPort { get; private set; }
    private IDisposable _disposer;

    public ECSchedulerTest()
        : base("ECSchedulerTest")
    {
        InPort = new ReactiveInPort<TimedLong>("in");
        AddPort(InPort);
    }

    protected override ReturnCode_t OnActivated(int execHandle)
    {
        _disposer = InPort
            .ObserveOn(ExecutionContextScheduler) // 以下の処理をExecutionContextSchedulerで実行するように切り替え
            .Subscribe(_ => { /* 排他制御の必要な処理 */ });
        
        return ReturnCode_t.RTC_OK;
    }

    // OnExecuteは、ExecutionContextSchedulerにより一定の周期で呼び出される
    protected override ReturnCode_t OnExecute(int execHandle)
    {
        /* 排他制御の必要な処理 */
        return ReturnCode_t.RTC_OK;
    }

    protected override ReturnCode_t OnDeactivated(int execHandle)
    {
        _disposer.Dispose();

        return ReturnCode_t.RTC_OK;
    }
}

RTコンポーネントのテスト

先日、Rx-Testingの使い方を紹介しましたが、Rx-Testingを使えばRTコンポーネントのテストも簡単に書けるようになります。

入力ポートに入れるデータは、入力するタイミングを任意に指定することができますし、OnExecuteの周期処理も実際の時間を待たなくても実行することができます。

例えば、入力ポートから入ってきたデータを2倍して出力するコンポーネントのテストは以下のようになります。

[TestClass]
public class ComponentTest : ReactiveTest
{
    [TestMethod]
    public void TargetComponentの入出力チェック()
    {
        var scheduler = new TestScheduler();

        // OutPortから出力されたデータを蓄えるためのもの。
        var recorder = scheduler.CreateObserver<TimedLong>();

        // InPortに入力するデータ。1秒ごとにデータを送る。
        var publisher = scheduler.CreateHotObservable(
            OnNext(TimeSpan.FromSeconds(1).Ticks, new TimedLong() {data = 1}),
            OnNext(TimeSpan.FromSeconds(2).Ticks, new TimedLong() {data = 2}),
            OnNext(TimeSpan.FromSeconds(3).Ticks, new TimedLong() {data = 3}),
            OnNext(TimeSpan.FromSeconds(4).Ticks, new TimedLong() {data = 4}),
            OnNext(TimeSpan.FromSeconds(5).Ticks, new TimedLong() {data = 5})
            );

        var comp = new TargetComponent();

        // コンポーネントのスケジューラをTestSchedulerに差し替える
        comp.ExecutionContextScheduler = scheduler;

        // 入出力ポートに接続
        comp.InPort.Connect(publisher);
        comp.OutPort.Connect(recorder);

        // コンポーネントを活性化
        var retRecorder = scheduler.CreateObserver<ReturnCode_t>();
        comp.ActivateAsync().Subscribe(retRecorder);
        scheduler.AdvanceBy(100);
        retRecorder.Messages.Count.Is(2);
        retRecorder.Messages.First().Value.Value.Is(ReturnCode_t.RTC_OK);

        // 時間を5秒進める
        scheduler.AdvanceBy(TimeSpan.FromSeconds(5).Ticks);

        // 入力データの値が2倍されて出力されていることを確認
        recorder.Messages.Count.Is(5);
        ReactiveAssert.AreElementsEqual(
            recorder.Messages.Select(x => x.Value.Value),
            new[] {
                    new TimedLong() {data = 2},
                    new TimedLong() {data = 4},
                    new TimedLong() {data = 6},
                    new TimedLong() {data = 8},
                    new TimedLong() {data = 10}
                });
    }
}

ReactiveRTM

ここまでに紹介した機能を持つRTミドルウェアとして、ReactiveRTMという名前でプロトタイプ実装を作ってみました。

zoetrope/ReactiveRTM · GitHub

一応、OpenRTM-aist-1.1やRTSystemEditorと連携させて動作させることはできますが、実装していない機能も多いので実用には程遠いです。(NuGetに登録もしたいのですが、あまり完成度の高くないものを登録するのもよくないかなと)

ちなみに、GUIアプリケーションとの連携サンプルや、IDLを書かずに独自のデータ型でデータポートを使ったり、C#のdynamicを使ってdRubyのようにサービスポートを呼び出すようなサンプルもあるので、興味があれば見てみてください。

そういえば、Erlang版のOpenRTMなどもありますが、コンセプトは近いのかもしれませんね。