BETA

Reactive Extentionsを使って処理件数通知など

投稿日:2019-11-08
最終更新:2019-11-10

ReactiveExtentionsを使ったWindowsFormのサンプルプログラムです。
github

Nugetからインストール

  • Rxを使用するためSystem.Reactiveをインストールします

マウスイベントをストリーム化してお絵かき

  • Rxではイベントをストリーム化して扱えます。
    ドラッグ処理に使用するマウスダウン、マウスアップ、マウス移動のObservableを作成します
  • イベントからの変換にはObservable.FromEventというメソッドがあります
IObservable<MouseEventArgs> MouseDownAsObservable()  
{  
    return Observable.FromEvent<MouseEventHandler, MouseEventArgs>  
    (  
        handler => (sender, e) => handler(e),  
        handler => this.MouseDown += handler,  
        handler => this.MouseDown -= handler  
    );  
}  

IObservable<MouseEventArgs> MouseMoveAsObservable()  
{  
    return Observable.FromEvent<MouseEventHandler, MouseEventArgs>  
    (  
        handler => (sender, e) => handler(e),  
        handler => this.MouseMove += handler,  
        handler => this.MouseMove -= handler  
    );  
}  

IObservable<MouseEventArgs> MouseUpAsObservable()  
{  
    return Observable.FromEvent<MouseEventHandler, MouseEventArgs>  
    (  
        handler => (sender, e) => handler(e),  
        handler => this.MouseUp += handler,  
        handler => this.MouseUp -= handler  
    );  
}  
  • マウスイベントのストリームを組み合わせてドラッグのObservableを作成します
  • 処理の流れはコメントの通りです。
private void Drawing1()  
{  
    //dragのObserveble  
    var drag = this.MouseDownAsObservable()  
                .SelectMany(_ => this.MouseMoveAsObservable())  
                .TakeUntil(this.MouseUpAsObservable());  

    drag.Zip //--- 前後の値をZipでまとめる  
    (  
        drag.Skip(1),  
        (x, y) => new { Prev = x.Location, Next = y.Location }  
    )  
    .Repeat() //--- 何度も繰り返すためRepeat  
    .Delay(TimeSpan.FromMilliseconds(1000)) //---描画を1秒遅らせる  
    .Subscribe(location =>  
    {  
        //--- 前の点と次の点を直線で結ぶ    
        using (var graphic = this.CreateGraphics())  
        using (var pen = new Pen(Color.Red, 3))  
            graphic.DrawLine(pen, location.Prev, location.Next);  
    });  
}  
  • マウスの移動から1秒遅れて(Delayして)線が描画されます
  • ストリームはPublishにより分岐することもできます
  • dragのObservableを2つに分岐して1秒遅れてれて描画、2秒遅れて描画するようにしています
private void Drawing2()  
{  
    //dragのObserveble  
    var drag = this.MouseDownAsObservable()  
                .SelectMany(_ => this.MouseMoveAsObservable())  
                .TakeUntil(this.MouseUpAsObservable());  
    drag.Zip  //--- 前後の値をZipでまとめる   
    (  
        drag.Skip(1),  
        (x, y) => new { Prev = x.Location, Next = y.Location }  
    )  
    .Repeat() //--- 何度も繰り返すためRepeat                
    .Publish(xs => //--- Publishでストリームを分岐する    
    {  
        //1秒送れて描画  
        xs.Delay(TimeSpan.FromMilliseconds(1000))  
        .Subscribe(location =>  
        {                  
            using (var graphic = this.CreateGraphics())  
            using (var pen = new Pen(Color.Red, 3))  
                graphic.DrawLine(pen, location.Prev, location.Next);  
        });  

        //2秒送れて描画  
        xs.Delay(TimeSpan.FromMilliseconds(2000))  
       .Subscribe(location =>  
       {                      
            using (var graphic = this.CreateGraphics())  
            using (var pen = new Pen(Color.Blue, 3))  
               graphic.DrawLine(pen, location.Prev, location.Next);  
       });                 

        return xs;  
    }).Subscribe();  

}  

処理件数の通知

  • 重たい処理を実行してUI上に処理件数を表示するようなケースです
  • .NetではBackgroundWorkerやasync/awaitなど非同期処理がありますがRxでUI更新をしてみます
  • 重たい処理側で通知したいタイミングでonNextを呼び出しています
private void Button1_Click(object sender, EventArgs e)  
{  
    var provider = GetProvider1();  

    provider.SubscribeOn(ThreadPoolScheduler.Instance) //Subscribeするスケジューラを指定  
        .TakeUntil(ButtonClickAsObservable(this.btnStop)) //ボタンが押されるとonCompleteとなる    
        .ObserveOn(System.Threading.SynchronizationContext.Current)//onNextするスケジューラを指定  
        .Subscribe(x =>  
        {  
            this.button1.Enabled = false;  
            this.textBox1.Text = x;  
        }  
        , () =>  
        { //onComplete    
            this.button1.Enabled = true;  
            this.textBox1.Text = "処理終了";  
        }  
        );  
}  

private IObservable<string> GetProvider1()  
{  

    IObservable<string> provider = Observable.Create<string>(o => {  

        var counter = 10000; //処理件数の合計    
        for (var i = 1; i <= counter; i++)  
        {  
            o.OnNext(string.Format("{0}/{1}", i, counter)); //   n/N 形式の文字列を通知    
            System.Threading.Thread.Sleep(1000); //時間のかかる処理と仮定    
        }  
        o.OnCompleted(); //処理終了の通知    

        return System.Reactive.Disposables.Disposable.Empty; //何も返さないDisposable    
    });  

    return provider;  
}  

IObservable<EventArgs> ButtonClickAsObservable(Button btn)  
{  
    return Observable.FromEvent<EventHandler, EventArgs>  
    (  
        handler => (sender, e) => handler(e),  
        handler => btn.Click += handler,  
        handler => btn.Click -= handler  
    );  
}  

  • Sampleメソッドを通してSubscribeすると直近n秒で最新の値を使用するため、n秒毎に更新することができます
    処理件数が多くて1件ずつ描画する必要が無い場合にいいかもしれません
private void Button2_Click(object sender, EventArgs e)  
{  
    var provider = GetProvider2();  

    provider.SubscribeOn(ThreadPoolScheduler.Instance) //Subscribeするスケジューラを指定  
        .TakeUntil(ButtonClickAsObservable(this.btnStop)) //ボタンが押されるとonCompleteとなる    
        .Sample(TimeSpan.FromMilliseconds(300)) //直近300msで最新の値を使用(→300ms毎に更新)  
        .ObserveOn(System.Threading.SynchronizationContext.Current)//onNextするスケジューラを指定  
        .Subscribe(x =>  
        {  
            this.button1.Enabled = false;  
            this.textBox1.Text = x;  
        }  
        , () =>  
        { //onComplete    
            this.button1.Enabled = true;  
            this.textBox1.Text = "処理終了";  
        }  
        );  
}  

private IObservable<string> GetProvider2()  
{  
    var random = new System.Random();  
    IObservable<string> provider = Observable.Create<string>(o => {  

        var counter = 10000; //処理件数の合計    
        for (var i = 1; i <= counter; i++)  
        {  
            o.OnNext(string.Format("{0}/{1}", i, counter)); //   n/N 形式の文字列を通知    
            System.Threading.Thread.Sleep(random.Next(1,100) ); //Sleep時間をランダムに  
        }  
        o.OnCompleted(); //処理終了の通知    

        return System.Reactive.Disposables.Disposable.Empty; //何も返さないDisposable    
    });  

    return provider;  

}  

技術ブログをはじめよう Qrunch(クランチ)は、プログラマの技術アプトプットに特化したブログサービスです
駆け出しエンジニアからエキスパートまで全ての方々のアウトプットを歓迎しております!
or 外部アカウントで 登録 / ログイン する
クランチについてもっと詳しく

この記事が掲載されているブログ

@yayaの技術ブログ

よく一緒に読まれる記事

0件のコメント

ブログ開設 or ログイン してコメントを送ってみよう
目次をみる
技術ブログをはじめよう Qrunch(クランチ)は、プログラマの技術アプトプットに特化したブログサービスです
or 外部アカウントではじめる
10秒で技術ブログが作れます!