ソースコードから理解する技術-UnderSourceCode

手を動かす(プログラムを組む)ことで技術を理解するブログ

RxGoを触ってみた

RxGoというReactiveをGolangで実現するためのライブラリを触ってみました。
READMEのサンプルが分かりやすく、それを見るのがいいとは思いますが、自分の理解のために多少改変してみたりもしたのでサンプルとして上げておきます。

GitHub - SrcHndWng/go-rx-sample

helloworld

一番シンプルなサンプルで、READMEの"Simple Usage"を改修したものになります。
READMEのサンプルとの違いは、time.Sleepを使って1秒毎にメッセージを送信するようにした点です。
実案件ではtime.Sleepの他、何らかのイベントを切っ掛けにメッセージを送信することもあるかと思いますが、そのような使用を想定しました。

コメントに書いたように "not error raise. print 'Done!'"の行を実行すると全てを実行して"Done!"の表示まで実行します。
コメントアウトしてある"error sample"以下のif~else文を実行するようにし、 "not error raise.・・・"をコメントアウトするとobserverのErrHandlerにてエラーがhandleされます。

package main

import (
	"fmt"
	"time"

	"github.com/reactivex/rxgo/observable"
	"github.com/reactivex/rxgo/observer"
)

func main() {
	watcher := observer.Observer{
		NextHandler: func(item interface{}) {
			fmt.Printf("handled: %v\n", item)
		},

		ErrHandler: func(err error) {
			fmt.Printf("error: %v\n", err)
		},

		DoneHandler: func() {
			fmt.Println("Done!")
		},
	}

	message := make(chan interface{})
	source := observable.Observable(message)
	sub := source.Subscribe(watcher)

	go func() {
		cnt := 0
		for {
			if cnt > 4 {
				break
			}
			// error sample
			// if cnt == 3 {
			// 	message <- errors.New("some error")
			// } else {
			// 	message <- fmt.Sprintf("Hello, cnt = %d", cnt)
			// }
			message <- fmt.Sprintf("Hello, cnt = %d", cnt) // not error raise. print 'Done!'
			cnt++
			time.Sleep(1 * time.Second)
		}
		close(message)
	}()

	<-sub
}

grouping

READMEの"Simple Usage"の次のサンプルを改修したものになり、handlerを observer.Newでグループ化したものです。
ただ処理概要は先の"helloworld"と同じにしてあるので、比較してみてください。

package main

import (
	"fmt"
	"time"

	"github.com/reactivex/rxgo/handlers"
	"github.com/reactivex/rxgo/observable"
	"github.com/reactivex/rxgo/observer"
)

func main() {
	onNext := handlers.NextFunc(func(item interface{}) {
		fmt.Printf("handled: %v\n", item)
	})

	onError := handlers.ErrFunc(func(err error) {
		fmt.Printf("error: %v\n", err)
	})

	onDone := handlers.DoneFunc(func() {
		fmt.Println("Done!")
	})

	watcher := observer.New(onNext, onError, onDone)

	message := make(chan interface{})
	source := observable.Observable(message)
	sub := source.Subscribe(watcher)

	go func() {
		cnt := 0
		for {
			if cnt > 4 {
				break
			}
			// error sample
			// if cnt == 3 {
			// 	message <- errors.New("some error")
			// } else {
			// 	message <- fmt.Sprintf("Hello, cnt = %d", cnt)
			// }
			message <- fmt.Sprintf("Hello, cnt = %d", cnt) // not error raise. print 'Done!'
			cnt++
			time.Sleep(1 * time.Second)
		}
		close(message)
	}()

	<-sub
}

observable

READMEの"Recap"欄に「In RxGo, it's useful to think of Observable and Connectable as channels with additional ability to Subscribe handlers.」とあります。
これもREADMEのサンプルを改修したものですが

  • DBへの登録などの代わりに標準出力するようにした
  • エラーについても実装した

などを変えています。

実行してみると"f1"と"f2"の出力順序が逆になりますが、このことからも先に引用した"Recap"欄の旨が分かるかと思います。

package main

import (
	"errors"
	"fmt"
	"time"

	"github.com/reactivex/rxgo/handlers"
	"github.com/reactivex/rxgo/observable"
)

func main() {
	f1 := func() interface{} {

		// Simulate a blocking I/O
		time.Sleep(2 * time.Second)
		return 1
	}

	f2 := func() interface{} {

		// Simulate a blocking I/O
		time.Sleep(time.Second)
		return 2
	}

	f3 := func() interface{} {

		// Simulate a blocking I/O
		time.Sleep(3 * time.Second)
		return 3
	}

	f4 := func() interface{} {

		// Simulate a blocking I/O
		time.Sleep(4 * time.Second)
		return errors.New("some error")
	}

	onNext := handlers.NextFunc(func(v interface{}) {
		fmt.Printf("handled, v = %v\n", v)
	})

	wait := observable.Start(f1, f2, f3, f4).Subscribe(onNext)
	sub := <-wait

	if err := sub.Err(); err != nil {
		fmt.Printf("error raise. err = %v\n", err)
	}

	fmt.Println("finish")
}