ソースコードから理解する技術-UnderSourceCode

手を動かす(プログラムを組む)ことで技術を理解するブログ

watermillを触ってみた

前回に引き続きメッセージを扱うライブラリを触ってみました。
github.com

「watermill」というもので、メッセージストリームを効率的に扱うためことを目的としているようです。
手始めにREADMEにある「Getting Started Guide」(https://watermill.io/docs/getting-started/)から始めてみたのですが、一番最初にあるサンプルを改変してみたので上げておきたいと思います。

ポイントとしては

  • メッセージを受信するためのChannelを複数(2つ)最初に定義する(messagesA、messagesB)
  • それぞれのChannel(messagesA、messagesB)にはトピック名を定義する
  • メッセージを受信するreceiveMessages()をgoroutineとして実行する
  • publishMessages()でトピック名を指定してメッセージを送信する
  • 最後に終了のログ(finish)も出している

点です。ソースは以下のようになります。
GitHub - SrcHndWng/go-watermill-simple-sample

package main

import (
	"context"
	"log"
	"time"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill/message"
	"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)

func main() {
	pubSub := gochannel.NewGoChannel(
		gochannel.Config{},
		watermill.NewStdLogger(false, false),
	)

	messagesA, err := pubSub.Subscribe(context.Background(), "example.topic.A")
	if err != nil {
		panic(err)
	}
	messagesB, err := pubSub.Subscribe(context.Background(), "example.topic.B")
	if err != nil {
		panic(err)
	}

	go receiveMessages(messagesA)
	go receiveMessages(messagesB)

	publishMessages(pubSub)

	log.Println("finish!")
}

func publishMessages(pubSub *gochannel.GoChannel) error {
	publishMessage := func(publisher message.Publisher, topic string, msgStr string) error {
		msg := message.NewMessage(watermill.NewUUID(), []byte(msgStr))
		return publisher.Publish(topic, msg)
	}

	for i := 0; i < 5; i++ {
		if err := publishMessage(pubSub, "example.topic.A", "Hello, message A!"); err != nil {
			return err
		}
		if err := publishMessage(pubSub, "example.topic.B", "Hello, message B!"); err != nil {
			return err
		}
		time.Sleep(time.Second)
	}
	return nil
}

func receiveMessages(messages <-chan *message.Message) {
	for msg := range messages {
		log.Printf("received message: %s, payload: %s", msg.UUID, string(msg.Payload))
		msg.Ack() // Ack sends message's acknowledgement.
	}
}

動かしてみると分かりますが、A・Bのメッセージが受信される順番は、送信した順になるとは限らないようです。

メッセージを受信したら、「Ack()」で受信済である旨を通知することもポイントかと思います。

これ以外は、goroutineを使っていること、pushish~、subscribe~など一般的な用語がそのままメソッド名になっているため、分かりやすいライブラリかなと思います。