RxGoを触ってみた
RxGoというReactiveをGolangで実現するためのライブラリを触ってみました。
READMEのサンプルが分かりやすく、それを見るのがいいとは思いますが、自分の理解のために多少改変してみたりもしたのでサンプルとして上げておきます。
GitHub - SrcHndWng/go-rx-sample
helloworld
一番シンプルなサンプルで、READMEの"Simple Usage"を改修したものになります。
READMEのサンプルとの違いは、time.Sleepを使って1秒毎にメッセージを送信するようにした点です。
実案件ではtime.Sleepの他、何らかのイベントを切っ掛けにメッセージを送信することもあるかと思いますが、そのような使用を想定しました。
コメントに書いたように "not error raise. print 'Done!'"の行を実行すると全てを実行して"Done!"の表示まで実行します。
コメントアウトしてある"error sample"以下のif~else文を実行するようにし、 "not error raise.・・・"をコメントアウトするとobserverのErrHandlerにてエラーがhandleされます。
package main import ( "fmt" "time" "github.com/reactivex/rxgo/observable" "github.com/reactivex/rxgo/observer" ) func main() { watcher := observer.Observer{ NextHandler: func(item interface{}) { fmt.Printf("handled: %v\n", item) }, ErrHandler: func(err error) { fmt.Printf("error: %v\n", err) }, DoneHandler: func() { fmt.Println("Done!") }, } message := make(chan interface{}) source := observable.Observable(message) sub := source.Subscribe(watcher) go func() { cnt := 0 for { if cnt > 4 { break } // error sample // if cnt == 3 { // message <- errors.New("some error") // } else { // message <- fmt.Sprintf("Hello, cnt = %d", cnt) // } message <- fmt.Sprintf("Hello, cnt = %d", cnt) // not error raise. print 'Done!' cnt++ time.Sleep(1 * time.Second) } close(message) }() <-sub }
grouping
READMEの"Simple Usage"の次のサンプルを改修したものになり、handlerを observer.Newでグループ化したものです。
ただ処理概要は先の"helloworld"と同じにしてあるので、比較してみてください。
package main import ( "fmt" "time" "github.com/reactivex/rxgo/handlers" "github.com/reactivex/rxgo/observable" "github.com/reactivex/rxgo/observer" ) func main() { onNext := handlers.NextFunc(func(item interface{}) { fmt.Printf("handled: %v\n", item) }) onError := handlers.ErrFunc(func(err error) { fmt.Printf("error: %v\n", err) }) onDone := handlers.DoneFunc(func() { fmt.Println("Done!") }) watcher := observer.New(onNext, onError, onDone) message := make(chan interface{}) source := observable.Observable(message) sub := source.Subscribe(watcher) go func() { cnt := 0 for { if cnt > 4 { break } // error sample // if cnt == 3 { // message <- errors.New("some error") // } else { // message <- fmt.Sprintf("Hello, cnt = %d", cnt) // } message <- fmt.Sprintf("Hello, cnt = %d", cnt) // not error raise. print 'Done!' cnt++ time.Sleep(1 * time.Second) } close(message) }() <-sub }
observable
READMEの"Recap"欄に「In RxGo, it's useful to think of Observable and Connectable as channels with additional ability to Subscribe handlers.」とあります。
これもREADMEのサンプルを改修したものですが
- DBへの登録などの代わりに標準出力するようにした
- エラーについても実装した
などを変えています。
実行してみると"f1"と"f2"の出力順序が逆になりますが、このことからも先に引用した"Recap"欄の旨が分かるかと思います。
package main import ( "errors" "fmt" "time" "github.com/reactivex/rxgo/handlers" "github.com/reactivex/rxgo/observable" ) func main() { f1 := func() interface{} { // Simulate a blocking I/O time.Sleep(2 * time.Second) return 1 } f2 := func() interface{} { // Simulate a blocking I/O time.Sleep(time.Second) return 2 } f3 := func() interface{} { // Simulate a blocking I/O time.Sleep(3 * time.Second) return 3 } f4 := func() interface{} { // Simulate a blocking I/O time.Sleep(4 * time.Second) return errors.New("some error") } onNext := handlers.NextFunc(func(v interface{}) { fmt.Printf("handled, v = %v\n", v) }) wait := observable.Start(f1, f2, f3, f4).Subscribe(onNext) sub := <-wait if err := sub.Err(); err != nil { fmt.Printf("error raise. err = %v\n", err) } fmt.Println("finish") }