UniRx 知らないと人権が無い気がしてきたので勉強した。

最終更新日



はじめに

最近、Unity を触る機会が多いため Unity の関するキーワードを勉強しています。
今回の投稿はその一環として UniRx を勉強してみました。自分用のメモなので読みにくいかと思いますが参考に出来る方がいれば幸いです。

参考:UniRx入門シリーズ 目次
参考:UniRxを導入するメリット ~こういう時にUniRxは使えるよ~
参考:未来のプログラミング技術をUnityで -UniRx-

UniRxとは

「UniRx入門シリーズ 目次」さんから引用させていただきました。

UniRxとは、neueccさんが作成されているReactive Extensions for Unityなライブラリです。

Reactive Extensions(以下Rx)は、要点だけ箇条書きすると次のようなライブラリとなっています。
・MicrosoftResearchが開発していたC#向け非同期処理向けライブラリ
・デザインパターンの1つ、Observerパターンをベースに設計されている
・時間に関係した処理や、実行タイミングが重要となる処理を簡単に記述できるようになっている
・完成度が高く、Java,JavaScript,Swiftなど様々な言語に移植されている

つまり、「Reactive Extensions」のUnity実装と言う事になりそうです。
「Observerパターンをベース」との事で、C# の event が多々でてくるたいです。

Unityアセット

UniRx - Reactive Extensions for Unity

UniRx – Reactive Extensions for Unity (無料)

定義と購読

  • event(C#) の上位互換 Subject を使う。
public event TimerEventHandler OnTimeChanged;
    ↓
private Subject<int> OnTimeChanged = new Subject<int>();
  • 「event +=」 の代わりに 「Subscribe」で購読する。
timeCounter.OnTimeChanged += time => {  };
    ↓
timeCounter.OnTimeChanged.Subscribe(time => {  });
  • 「event()」 の代わりに 「OnNext」 で発行する。
OnTimeChanged(time);
    ↓
OnTimeChanged.OnNext(time);
  • 「Subject」が実装しているインターフェース「IObserver」には3種類ある。
    • OnNext:上記説明
    • OnError:発生したエラー(Exception)を通知するメッセージを発行するメソッド
    • OnCompleted:メッセージの発行が完了したことを通知するメソッド
  • 「Subject」が実装しているインターフェース「IObservable」には1種類あるがいくつかの書き方が出来る。
    購読にはいくつかのパターンで書ける
//OnNextのみ
subject.Subscribe(msg => Debug.Log("Subscribe1:" + msg));

//OnNext & OnError
subject.Subscribe(
    msg => Debug.Log("Subscribe1:" + msg),
    error => Debug.LogError("Error" + error));

//OnNext & OnCompleted
subject.Subscribe(
    msg => Debug.Log("Subscribe1:" + msg),
    () => Debug.Log("Completed"));

//OnNext & OnError & OnCompleted
subject.Subscribe(
    msg => Debug.Log("Subscribe1:" + msg),
    error => Debug.LogError("Error" + error),
    () => Debug.Log("Completed"));
  • 「subject.Subscribe」は Javascript ES6 の Promise と同じような概念だった。

フィルター

  • 「subject」は Linq のような書き方ができる。
subject.
  .Where(x => x == "Enemy") //←フィルタリングオペレータ
  .Subscribe(x => Debug.Log(string.Format("プレイヤが{0}に衝突しました", x)));
  • 他にも以下がある。
    • フィルタリングする「Where」
    • メッセージ変換する「Select]
    • 重複を排除する「Distinct」
    • 一定個数まとまるまで待つ「Buffer」
    • 短時間にまとめてきた場合に先頭のみを使う「ThrottleFirst」
  • 値が無い場合の書き方
var subject = new Subject<Unit>();
subject.Subscribe(x => Debug.Log(x));
subject.OnNext(Unit.Default);

購読終了

  • 手動で購読を終了(Dispose)する
var subject = new Subject<int>();
var disposable = subject.Subscribe(x => Debug.Log(x), () => Debug.Log("OnCompleted"));
disposable.Dispose();
  • 指定の GameObject が破棄されたら購読を終了(AddTo)する
    _timeCounter.OnTimeChanged
        .Where(x => x == 0) //タイマが0になった時のみ実行
        .Subscribe(_ =>
        {
            //タイマが0になったら初期座標に戻る
            transform.position = Vector3.zero;
        }).AddTo(gameObject); //指定のgameObjectが破棄されたらDisposeする

ストリームの種類

1. Subject<T>  上記で説明
2. BehaviorSubject<T>  最後に発行された物をキャッシュして、1つ前の値を発行する。
3. ReplaySubject<T>  過去全ての値を発行する。
4. AsyncSubject<T>  OnCompletedが実行されたタイミングで、最後のOnNextのみ発行
5. ReactiveProperty<T>  変数1に対してSubjectを実装する。
6. ReactiveCollection<T>  配列に対してSubjectを付けたもの。
7. ReactiveDictionary<T1,T2>  連想配列に対してSubjectを付けたもの。
8. ObservableWWW  指定URL実行して、Subscribe できる。
9. Observable.NextFrame  次フレームで Subscribe できる。
10. Observable.EveryUpdate   よくわからない
11. ObserveEveryValueChanged  指定の変数が変更されたら Subscribe される。
  • Update/Destroy をストリームとして扱う。
        this.UpdateAsObservable()
            .Subscribe(
                _ => Debug.Log("Update!"), //OnNext
                () => Debug.Log("OnCompleted") //OnCompleted
            );

        // OnDestoryを受けてログに出す
        this.OnDestroyAsObservable()
            .Subscribe(_ => Debug.Log("Destroy!"));

コルーチン

コルーチンをストリームに取り込む

public class ConvertFromCoroutine : MonoBehaviour
{
    void Start()
    {
        Observable.FromCoroutine(NantokaCoroutine, publishEveryYield: false)
            .Subscribe(
                _ => Debug.Log("OnNext"),
                () => Debug.Log("OnCompleted")
            ).AddTo(gameObject);
    }

    IEnumerator NantokaCoroutine()
    {
        Debug.Log("Coroutine started.");

        //なんか処理して待ち受ける的な
        yield return new WaitForSeconds(3);

        Debug.Log("Coroutine finished.");
    }
}

複数のコルーチンを並列に実行する

    Observable.WhenAll(
        Observable.FromCoroutine<string>(o => CoroutineA(o)),
        Observable.FromCoroutine<string>(o => CoroutineB(o))
    ).Subscribe(xs =>
    {
        foreach (var x in xs)
        {
            Debug.Log("result:" + x);
        }
    });

スケジューラ

Scheduler は「いつ」「どのように」処理を実行するかを指定できる

        Scheduler.ThreadPool.Schedule(() => Debug.Log("別スレッドで実行"));
        Scheduler.Immediate.Schedule(() => Debug.Log("すぐに実行"));
        Scheduler.MainThreadEndOfFrame.Schedule(() => Debug.Log("フレーム終了後に実行"));

所感(まとめ)

ストリーム(変化)をきっかけにフィルター(絞り込んで)、実行(処理)を行う事によって完結に書ける点は Linq のような感じで書けるのがすべらしいかなと感じました。

また、Unity の GameObject の破棄をトリガーに購読を破棄できる、Update/Destroy をストリームとして扱うのは Unity/UniRx らしさが出ていて親和性があり素晴らしいかと思います。

UniRx が多少重いらしいのですが Unity で問題になりやすいのは GPU 周りが多いかと思いますので積極的に採用していこうかと思います。
Unity 2018 の 「C# Job System」「ECS(Entity Component System)」あたりも UniRx に適用可能なんじゃないかなと考えてみたりしています。







よければ、SNSにシェアをお願いします!