前言
前陣子開發項目有使用到 GCP Pub/Sub,開發完後做個筆記紀錄一下細節
設定 Pub/Sub
啟用服務
gcloud services enable pubsub.googleapis.com
建立 service account
GCP 官方文件 - Pub/Sub Access control with IAM
# 建立 service-accounts
gcloud iam service-accounts create $Name
# 綁定權限
# 因程式需要發布訊息、訂閱訊息,新增以下權限給服務帳戶
gcloud projects add-iam-policy-binding $PROJECT_ID --member="serviceAccount:$Name@$PROJECT_ID.iam.gserviceaccount.com" --role="roles/pubsub.publisher"
gcloud projects add-iam-policy-binding $PROJECT_ID --member="serviceAccount:$Name@$PROJECT_ID.iam.gserviceaccount.com" --role="roles/pubsub.subscriber"
# 下載.json檔憑證至本地端
gcloud iam service-accounts keys create $KEY_PATH --iam-account=$NAME@$PROJECT_ID.iam.gserviceaccount.com
ex:
gcloud iam service-accounts create leo-pubsub
gcloud projects add-iam-policy-binding xxx --member="serviceAccount:leo-pubsub@xxx.iam.gserviceaccount.com" --role="roles/pubsub.publisher"
gcloud projects add-iam-policy-binding xxx --member="serviceAccount:leo-pubsub@xxx.iam.gserviceaccount.com" --role="roles/pubsub.subscriber"
gcloud iam service-accounts keys create "D:\Lab\PubSub\pubsub-credential.json" --iam-account=$NAME@$PROJECT_ID.iam.gserviceaccount.com
gcloud pubsub topics create Test-Topic
- 建立 Subscription
GCP 官方文件 - Create subscriptions
gcloud pubsub subscriptions create "Test-Topic-Sub01" --topic=Test-Topic
以 .Net 6 專案為例
安裝套件
dotnet add package Google.Cloud.PubSub.V1
發布訊息
using Google.Cloud.PubSub.V1;
namespace GCP_PubSubProducer
{
internal class Program
{
private static async Task Main(string[] args)
{
Console.WriteLine(" Start GCP_PubSubProducer");
// 憑證
Environment.SetEnvironmentVariable("GOOGLE_APPLICATION_CREDENTIALS", @"C:\Lab\PubSub\pubsub-credential.json");
TopicName topicName = new TopicName("project-id", "Test-Topic");
PublisherClient publisher = await PublisherClient.CreateAsync(topicName);
Enumerable.Range(1, 10).ToList().ForEach(item =>
{
string msg = $"{item}";
string messageId = publisher.PublishAsync(msg).GetAwaiter().GetResult();
Console.WriteLine($"{DateTime.Now:yyyy/MM/dd HH:mm:ss.fff} MessageId:{messageId}, Sent {msg}");
});
Console.ReadKey();
}
}
}
接收訊息
- 訊息有四種類型 未確認、未完成 (當訊息發送給訂閱者後,在未確認前,該訊息會被標記成未完成)、已完成 (訊息已被成功確認)、已刪除
- 訊息發送給訂閱者後,在未確認前,該訊息會被標記成未完成,而未完成的訊息,不會再發給其他訂閱者 (假設有多台服務使用相同的subscription 來訂閱 Topic,不會重複收到相同的訊息,只會有一台收到)
- Pub/Sub 會不斷嘗試送出未確認的訊息,而未完成的訊息則會 pending 住
- 訊息發送給訂閱者後,訂閱者必須確認該訊息 (回覆 ACK or NACK),若收到 NACK 或在 ackDeadline 之前沒有確認訊息,則 Pub/Sub 會 Retry 重送
傳送類型
subscriptions Type 有三種
- Pull subscription
GCP 官方文件 - Pull subscription
Pull 又分兩種 streaming pull、pull
streaming pull 能夠及時接收訊息,有提供 Rest API、RPC 方式呼叫。
Google 封裝好的 SubscriberClient.StartAsync 就是 streaming pull,是透過 gRPC 進行 Client(我方)、Server(GCP Pub/Sub) 雙向連接
pull 則是在 Client(我方)服務資源有限的情況下,或者需要控制訂閱訊息數量時,可考慮使用,有提供 Rest API、RPC 方式呼叫。
假設需求是 每分鐘最多只處理100筆訂單,這種有限制訂閱訊息數量的情況下,可以考慮使用 pull 的方式,若需要低延遲性及時處理的話,則直接使用 streaming pull 較好
using Google.Cloud.PubSub.V1;
using Grpc.Core;
namespace GCP_PubSubConsumber
{
internal class Program
{
private static async Task Main(string[] args)
{
Console.WriteLine(" Start GCP_PubSubConsumber");
// 憑證
Environment.SetEnvironmentVariable("GOOGLE_APPLICATION_CREDENTIALS", @"C:\Lab\PubSub\pubsub-credential.json");
//await StreamPull();
await Pull();
Console.ReadKey();
}
private static async Task StreamPull()
{
SubscriptionName subscriptionName = new SubscriptionName("project-id", "Test-Topic-Sub01");
SubscriberClient subscriber = await SubscriberClient.CreateAsync(subscriptionName);
await subscriber.StartAsync((msg, cancellationToken) =>
{
Console.WriteLine($"MessageId {msg.MessageId}, Message: {msg.Data.ToStringUtf8()}, PublishTime {msg.PublishTime.ToDateTime()}");
// Return Reply.Ack to indicate this message has been handled.
return Task.FromResult(SubscriberClient.Reply.Ack);
});
}
private static async Task Pull()
{
SubscriptionName subscriptionName = new SubscriptionName("project-id", "Test-Topic-Sub01");
SubscriberServiceApiClient subscriberClient = SubscriberServiceApiClient.Create();
int messageCount = 0;
try
{
PullResponse response = await subscriberClient.PullAsync(subscriptionName, maxMessages: 10);
foreach (ReceivedMessage msg in response.ReceivedMessages)
{
string text = System.Text.Encoding.UTF8.GetString(msg.Message.Data.ToArray());
Console.WriteLine($"MessageId: {msg.Message.MessageId}, Message: {text}, PublishTime: {msg.Message.PublishTime}");
Interlocked.Increment(ref messageCount);
subscriberClient.Acknowledge(subscriptionName, response.ReceivedMessages.Select(msg => msg.AckId));
}
}
catch (RpcException ex) when (ex.Status.StatusCode == StatusCode.Unavailable)
{
// UNAVAILABLE due to too many concurrent pull requests pending for the given subscription.
}
}
}
}
備註: 假設在沒有訊息可訂閱時,執行 pull 則會阻塞執行續,直到有新的訊息進來,20 秒後會 timeout,不會發生異常,只是會得到空的 response.ReceivedMessages
Push subscription
可參考 w4560000 - GCP Pub/Sub 訂閱者以 Push 方式接收訊息並驗證JWTBigQuery subscription
在建立 subscriptions 時,就設定好接收訊息直接寫入 BigQuery
訊息保留時間
未確認消息的保留時間,因 Pub/Sub 有訊息儲存費用,若未確認消息數量太多,導致訊息儲存容量較多,會產生較多額外費用
有效期限
若沒有任何訂閱活動 (連線、pull、確認消息),則在設定的有效天數後,該訂閱會被刪除,也可不設有效期限
確認期限
發送訊息給訂閱者後,在確認期限內
訂閱項目篩選器
篩選器只有在建立訂閱項目時才能設定,可透過attribute來篩選出想要接收的訊息
ex: attributes.test= “888”
GCP 官方文件 - Publish messages to topic Use attributes
當發布訊息時 有設定 attribute.test = “888”,才會被該訂閱者接收
private static async Task PublishWithAttributes()
{
// 設定要發布的主題
TopicName topicName = new TopicName(projectId, topicId);
PublisherClient publisher = await PublisherClient.CreateAsync(topicName);
// 發布訊息
try
{
var pubsubMessage = new PubsubMessage
{
Data = ByteString.CopyFromUtf8("123"),
Attributes =
{
{ "test", "888" }
}
};
string messageId = await publisher.PublishAsync(pubsubMessage);
Console.WriteLine($"{DateTime.Now:yyyy/MM/dd HH:mm:ss.fff} messageId:{messageId}, Sent {JsonSerializer.Serialize(pubsubMessage)}");
}
catch (RpcException ex)
{
Console.WriteLine($"發布訊息到 Pub/Sub 時發生錯誤: {ex.Message}");
}
}
僅傳送一次 (Exactly once)
未勾選時,是預設以 At-least-once
At-least-once 當確認時,已超過確認截止時間,則該筆訊息有可能被重複傳遞
當有勾選時,GCP Pub/Sub 保證
- 當訊息被確認成功,則不會重複傳遞
- 當訊息未完成時,不會重新傳遞 (未完成 = 在消息被確認前、傳遞後消息確認截止時間前)
- 當訊息被多次傳遞後,只有最新一筆訊息的 acknowledgment ID 可以確認該筆訊息,先前的 acknowledgment ID 會失效
At-least once、Exactly once 差異可參考以下Blog說明
ALEX XU Blog - At most once, at least once, exactly once
訊息排序
當啟用了訊息排序使用相同 ordering key時,Pub/Sub 會依照發布時間來傳遞給訂閱者(先發布的先傳遞)
無效信件
會先設定好另一個 Topic 用來存放無效信件
當某一訂閱者在接收訊息後,在確認截止時間前未確認、或者回覆Nack,則計算一次傳遞次數
以上圖為例,當傳遞五次後,訊息仍然未被確認,則該訊息則不再會被訂閱者接收,而是轉拋到 DeadTopic 存放
重試政策
Retry 重送有兩種方式可以選,立即的話,跟字面上意思一樣,而指數輪巡延遲,則代表相同訊息若不斷訂閱失敗,則 Retry 的間隔時間會越來越長
情境補充
- 當多個服務都使用相同的 subscriptionName 訂閱相同 Topic 時,接收訊息時只會有一個服務會接收到,不會同時接收到同一筆訊息
- 當多個服務都使用不同的 subscriptionName 訂閱相同 Topic 時,接收訊息時則都會收到相同訊息
備註
- 當初開發時,是採用 streaming pull 的方式來接收通知
- 送到測試區時發現一直收不到通知,因查不出原因,維運同仁把防火牆關閉來測試才收得到通知
- 後續維運同仁一一排查防火牆 Port,發現是該機器一開始建置時就沒有開通 UDP 53,導致無法解析 gcp pubsub 服務的 domain
- 造成 streaming pull 其內部透過 gRPC 來連線至 gcp pubsub 服務異常,在此筆記一下
參考文件
GCP 官方文件 - What is Pub/Sub
GCP 官方文件 - Cloud Pub/Sub announces General Availability of exactly-once delivery
轉載請註明來源,若有任何錯誤或表達不清楚的地方,歡迎在下方評論區留言,也可以來信至 leozheng0621@gmail.com
如果文章對您有幫助,歡迎斗內(donate),請我喝杯咖啡