GCP Pub/Sub With .Net 6

前言

GCP 官方文件 - What is Pub/Sub

前陣子開發項目有使用到 GCP Pub/Sub,開發完後做個筆記紀錄一下細節

設定 Pub/Sub

  • 啟用服務

    gcloud services enable pubsub.googleapis.com
    
  • 建立 service account

GCP 官方文件 - Pub/Sub Access control with IAM

# 建立 service-accounts
gcloud iam service-accounts create $Name

# 綁定權限

# 因程式需要發布訊息、訂閱訊息,新增以下權限給服務帳戶 
gcloud projects add-iam-policy-binding $PROJECT_ID --member="serviceAccount:$Name@$PROJECT_ID.iam.gserviceaccount.com" --role="roles/pubsub.publisher"
gcloud projects add-iam-policy-binding $PROJECT_ID --member="serviceAccount:$Name@$PROJECT_ID.iam.gserviceaccount.com" --role="roles/pubsub.subscriber"

# 下載.json檔憑證至本地端 
gcloud iam service-accounts keys create $KEY_PATH --iam-account=$NAME@$PROJECT_ID.iam.gserviceaccount.com

ex:
gcloud iam service-accounts create leo-pubsub
gcloud projects add-iam-policy-binding xxx --member="serviceAccount:leo-pubsub@xxx.iam.gserviceaccount.com" --role="roles/pubsub.publisher"
gcloud projects add-iam-policy-binding xxx --member="serviceAccount:leo-pubsub@xxx.iam.gserviceaccount.com" --role="roles/pubsub.subscriber"
gcloud iam service-accounts keys create "D:\Lab\PubSub\pubsub-credential.json" --iam-account=$NAME@$PROJECT_ID.iam.gserviceaccount.com
gcloud pubsub topics create Test-Topic
gcloud pubsub subscriptions create "Test-Topic-Sub01" --topic=Test-Topic

以 .Net 6 專案為例

安裝套件

dotnet add package Google.Cloud.PubSub.V1

發布訊息

using Google.Cloud.PubSub.V1;

namespace GCP_PubSubProducer
{
    internal class Program
    {
        private static async Task Main(string[] args)
        {
            Console.WriteLine(" Start GCP_PubSubProducer");

            // 憑證
            Environment.SetEnvironmentVariable("GOOGLE_APPLICATION_CREDENTIALS", @"C:\Lab\PubSub\pubsub-credential.json");

            TopicName topicName = new TopicName("project-id", "Test-Topic");
            PublisherClient publisher = await PublisherClient.CreateAsync(topicName);

            Enumerable.Range(1, 10).ToList().ForEach(item =>
            {
                string msg = $"{item}";
                string messageId = publisher.PublishAsync(msg).GetAwaiter().GetResult();
                Console.WriteLine($"{DateTime.Now:yyyy/MM/dd HH:mm:ss.fff} MessageId:{messageId}, Sent {msg}");
            });

            Console.ReadKey();
        }
    }
}

接收訊息

  1. 訊息有四種類型 未確認、未完成 (當訊息發送給訂閱者後,在未確認前,該訊息會被標記成未完成)、已完成 (訊息已被成功確認)、已刪除
  2. 訊息發送給訂閱者後,在未確認前,該訊息會被標記成未完成,而未完成的訊息,不會再發給其他訂閱者 (假設有多台服務使用相同的subscription 來訂閱 Topic,不會重複收到相同的訊息,只會有一台收到)
  3. Pub/Sub 會不斷嘗試送出未確認的訊息,而未完成的訊息則會 pending 住
  4. 訊息發送給訂閱者後,訂閱者必須確認該訊息 (回覆 ACK or NACK),若收到 NACK 或在 ackDeadline 之前沒有確認訊息,則 Pub/Sub 會 Retry 重送

傳送類型

subscriptions Type 有三種

  1. Pull subscription
    GCP 官方文件 - Pull subscription
    Pull 又分兩種 streaming pull、pull

streaming pull 能夠及時接收訊息,有提供 Rest APIRPC 方式呼叫。
Google 封裝好的 SubscriberClient.StartAsync 就是 streaming pull,是透過 gRPC 進行 Client(我方)、Server(GCP Pub/Sub) 雙向連接

pull 則是在 Client(我方)服務資源有限的情況下,或者需要控制訂閱訊息數量時,可考慮使用,有提供 Rest APIRPC 方式呼叫。
假設需求是 每分鐘最多只處理100筆訂單,這種有限制訂閱訊息數量的情況下,可以考慮使用 pull 的方式,若需要低延遲性及時處理的話,則直接使用 streaming pull 較好

using Google.Cloud.PubSub.V1;
using Grpc.Core;

namespace GCP_PubSubConsumber
{
    internal class Program
    {
        private static async Task Main(string[] args)
        {
            Console.WriteLine(" Start GCP_PubSubConsumber");

            // 憑證
            Environment.SetEnvironmentVariable("GOOGLE_APPLICATION_CREDENTIALS", @"C:\Lab\PubSub\pubsub-credential.json");

            //await StreamPull();

            await Pull();
            Console.ReadKey();
        }

        private static async Task StreamPull()
        {
            SubscriptionName subscriptionName = new SubscriptionName("project-id", "Test-Topic-Sub01");
            SubscriberClient subscriber = await SubscriberClient.CreateAsync(subscriptionName);

            await subscriber.StartAsync((msg, cancellationToken) =>
            {
                Console.WriteLine($"MessageId {msg.MessageId}, Message: {msg.Data.ToStringUtf8()}, PublishTime {msg.PublishTime.ToDateTime()}");

                // Return Reply.Ack to indicate this message has been handled.
                return Task.FromResult(SubscriberClient.Reply.Ack);
            });
        }

        private static async Task Pull()
        {
            SubscriptionName subscriptionName = new SubscriptionName("project-id", "Test-Topic-Sub01");
            SubscriberServiceApiClient subscriberClient = SubscriberServiceApiClient.Create();
            int messageCount = 0;
            try
            {
                PullResponse response = await subscriberClient.PullAsync(subscriptionName, maxMessages: 10);
                foreach (ReceivedMessage msg in response.ReceivedMessages)
                {
                    string text = System.Text.Encoding.UTF8.GetString(msg.Message.Data.ToArray());
                    Console.WriteLine($"MessageId: {msg.Message.MessageId}, Message: {text}, PublishTime: {msg.Message.PublishTime}");
                    Interlocked.Increment(ref messageCount);

                    subscriberClient.Acknowledge(subscriptionName, response.ReceivedMessages.Select(msg => msg.AckId));
                }
            }
            catch (RpcException ex) when (ex.Status.StatusCode == StatusCode.Unavailable)
            {
                // UNAVAILABLE due to too many concurrent pull requests pending for the given subscription.
            }
        }
    }
}

備註: 假設在沒有訊息可訂閱時,執行 pull 則會阻塞執行續,直到有新的訊息進來,20 秒後會 timeout,不會發生異常,只是會得到空的 response.ReceivedMessages

  1. Push subscription
    可參考 w4560000 - GCP Pub/Sub 訂閱者以 Push 方式接收訊息並驗證JWT

  2. BigQuery subscription

    在建立 subscriptions 時,就設定好接收訊息直接寫入 BigQuery

訊息保留時間

未確認消息的保留時間,因 Pub/Sub 有訊息儲存費用,若未確認消息數量太多,導致訊息儲存容量較多,會產生較多額外費用

有效期限

若沒有任何訂閱活動 (連線、pull、確認消息),則在設定的有效天數後,該訂閱會被刪除,也可不設有效期限

確認期限

發送訊息給訂閱者後,在確認期限內

訂閱項目篩選器

篩選器只有在建立訂閱項目時才能設定,可透過attribute來篩選出想要接收的訊息

ex: attributes.test= “888”
GCP 官方文件 - Publish messages to topic Use attributes

當發布訊息時 有設定 attribute.test = “888”,才會被該訂閱者接收

private static async Task PublishWithAttributes()
{
    // 設定要發布的主題
    TopicName topicName = new TopicName(projectId, topicId);
    PublisherClient publisher = await PublisherClient.CreateAsync(topicName);

    // 發布訊息
    try
    {
        var pubsubMessage = new PubsubMessage
        {
            Data = ByteString.CopyFromUtf8("123"),
            Attributes =
            {
                { "test", "888" }
            }
        };
        string messageId = await publisher.PublishAsync(pubsubMessage);
        Console.WriteLine($"{DateTime.Now:yyyy/MM/dd HH:mm:ss.fff} messageId:{messageId}, Sent {JsonSerializer.Serialize(pubsubMessage)}");
    }
    catch (RpcException ex)
    {
        Console.WriteLine($"發布訊息到 Pub/Sub 時發生錯誤: {ex.Message}");
    }
}

僅傳送一次 (Exactly once)

未勾選時,是預設以 At-least-once

At-least-once 當確認時,已超過確認截止時間,則該筆訊息有可能被重複傳遞

當有勾選時,GCP Pub/Sub 保證

  • 當訊息被確認成功,則不會重複傳遞
  • 當訊息未完成時,不會重新傳遞 (未完成 = 在消息被確認前、傳遞後消息確認截止時間前)
  • 當訊息被多次傳遞後,只有最新一筆訊息的 acknowledgment ID 可以確認該筆訊息,先前的 acknowledgment ID 會失效

At-least once、Exactly once 差異可參考以下Blog說明
ALEX XU Blog - At most once, at least once, exactly once

訊息排序

當啟用了訊息排序使用相同 ordering key時,Pub/Sub 會依照發布時間來傳遞給訂閱者(先發布的先傳遞)

無效信件


會先設定好另一個 Topic 用來存放無效信件

當某一訂閱者在接收訊息後,在確認截止時間前未確認、或者回覆Nack,則計算一次傳遞次數
以上圖為例,當傳遞五次後,訊息仍然未被確認,則該訊息則不再會被訂閱者接收,而是轉拋到 DeadTopic 存放

重試政策

Retry 重送有兩種方式可以選,立即的話,跟字面上意思一樣,而指數輪巡延遲,則代表相同訊息若不斷訂閱失敗,則 Retry 的間隔時間會越來越長

情境補充

  1. 當多個服務都使用相同的 subscriptionName 訂閱相同 Topic 時,接收訊息時只會有一個服務會接收到,不會同時接收到同一筆訊息
  2. 當多個服務都使用不同的 subscriptionName 訂閱相同 Topic 時,接收訊息時則都會收到相同訊息

備註

  • 當初開發時,是採用 streaming pull 的方式來接收通知
  • 送到測試區時發現一直收不到通知,因查不出原因,維運同仁把防火牆關閉來測試才收得到通知
  • 後續維運同仁一一排查防火牆 Port,發現是該機器一開始建置時就沒有開通 UDP 53,導致無法解析 gcp pubsub 服務的 domain
  • 造成 streaming pull 其內部透過 gRPC 來連線至 gcp pubsub 服務異常,在此筆記一下

參考文件

GCP 官方文件 - What is Pub/Sub
GCP 官方文件 - Cloud Pub/Sub announces General Availability of exactly-once delivery


轉載請註明來源,若有任何錯誤或表達不清楚的地方,歡迎在下方評論區留言,也可以來信至 leozheng0621@gmail.com
如果文章對您有幫助,歡迎斗內(donate),請我喝杯咖啡

斗內💰

×

歡迎斗內

github