feat: create database tables and queries for notifications (#13536)

This commit is contained in:
Danny Kopping
2024-06-28 11:21:25 +02:00
committed by GitHub
parent 4213560b7a
commit 0a221e8d5b
17 changed files with 999 additions and 0 deletions

View File

@ -817,6 +817,13 @@ func (q *querier) AcquireLock(ctx context.Context, id int64) error {
return q.db.AcquireLock(ctx, id)
}
func (q *querier) AcquireNotificationMessages(ctx context.Context, arg database.AcquireNotificationMessagesParams) ([]database.AcquireNotificationMessagesRow, error) {
if err := q.authorizeContext(ctx, policy.ActionUpdate, rbac.ResourceSystem); err != nil {
return nil, err
}
return q.db.AcquireNotificationMessages(ctx, arg)
}
// TODO: We need to create a ProvisionerJob resource type
func (q *querier) AcquireProvisionerJob(ctx context.Context, arg database.AcquireProvisionerJobParams) (database.ProvisionerJob, error) {
// if err := q.authorizeContext(ctx, policy.ActionUpdate, rbac.ResourceSystem); err != nil {
@ -861,6 +868,20 @@ func (q *querier) BatchUpdateWorkspaceLastUsedAt(ctx context.Context, arg databa
return q.db.BatchUpdateWorkspaceLastUsedAt(ctx, arg)
}
func (q *querier) BulkMarkNotificationMessagesFailed(ctx context.Context, arg database.BulkMarkNotificationMessagesFailedParams) (int64, error) {
if err := q.authorizeContext(ctx, policy.ActionUpdate, rbac.ResourceSystem); err != nil {
return 0, err
}
return q.db.BulkMarkNotificationMessagesFailed(ctx, arg)
}
func (q *querier) BulkMarkNotificationMessagesSent(ctx context.Context, arg database.BulkMarkNotificationMessagesSentParams) (int64, error) {
if err := q.authorizeContext(ctx, policy.ActionUpdate, rbac.ResourceSystem); err != nil {
return 0, err
}
return q.db.BulkMarkNotificationMessagesSent(ctx, arg)
}
func (q *querier) CleanTailnetCoordinators(ctx context.Context) error {
if err := q.authorizeContext(ctx, policy.ActionDelete, rbac.ResourceTailnetCoordinator); err != nil {
return err
@ -1010,6 +1031,13 @@ func (q *querier) DeleteOAuth2ProviderAppTokensByAppAndUserID(ctx context.Contex
return q.db.DeleteOAuth2ProviderAppTokensByAppAndUserID(ctx, arg)
}
func (q *querier) DeleteOldNotificationMessages(ctx context.Context) error {
if err := q.authorizeContext(ctx, policy.ActionDelete, rbac.ResourceSystem); err != nil {
return err
}
return q.db.DeleteOldNotificationMessages(ctx)
}
func (q *querier) DeleteOldProvisionerDaemons(ctx context.Context) error {
if err := q.authorizeContext(ctx, policy.ActionDelete, rbac.ResourceSystem); err != nil {
return err
@ -1114,6 +1142,13 @@ func (q *querier) DeleteWorkspaceAgentPortSharesByTemplate(ctx context.Context,
return q.db.DeleteWorkspaceAgentPortSharesByTemplate(ctx, templateID)
}
func (q *querier) EnqueueNotificationMessage(ctx context.Context, arg database.EnqueueNotificationMessageParams) (database.NotificationMessage, error) {
if err := q.authorizeContext(ctx, policy.ActionCreate, rbac.ResourceSystem); err != nil {
return database.NotificationMessage{}, err
}
return q.db.EnqueueNotificationMessage(ctx, arg)
}
func (q *querier) FavoriteWorkspace(ctx context.Context, id uuid.UUID) error {
fetch := func(ctx context.Context, id uuid.UUID) (database.Workspace, error) {
return q.db.GetWorkspaceByID(ctx, id)
@ -1121,6 +1156,13 @@ func (q *querier) FavoriteWorkspace(ctx context.Context, id uuid.UUID) error {
return update(q.log, q.auth, fetch, q.db.FavoriteWorkspace)(ctx, id)
}
func (q *querier) FetchNewMessageMetadata(ctx context.Context, arg database.FetchNewMessageMetadataParams) (database.FetchNewMessageMetadataRow, error) {
if err := q.authorizeContext(ctx, policy.ActionRead, rbac.ResourceSystem); err != nil {
return database.FetchNewMessageMetadataRow{}, err
}
return q.db.FetchNewMessageMetadata(ctx, arg)
}
func (q *querier) GetAPIKeyByID(ctx context.Context, id string) (database.APIKey, error) {
return fetch(q.log, q.auth, q.db.GetAPIKeyByID)(ctx, id)
}

View File

@ -2467,6 +2467,32 @@ func (s *MethodTestSuite) TestSystemFunctions() {
AgentID: uuid.New(),
}).Asserts(tpl, policy.ActionCreate)
}))
s.Run("AcquireNotificationMessages", s.Subtest(func(db database.Store, check *expects) {
// TODO: update this test once we have a specific role for notifications
check.Args(database.AcquireNotificationMessagesParams{}).Asserts(rbac.ResourceSystem, policy.ActionUpdate)
}))
s.Run("BulkMarkNotificationMessagesFailed", s.Subtest(func(db database.Store, check *expects) {
// TODO: update this test once we have a specific role for notifications
check.Args(database.BulkMarkNotificationMessagesFailedParams{}).Asserts(rbac.ResourceSystem, policy.ActionUpdate)
}))
s.Run("BulkMarkNotificationMessagesSent", s.Subtest(func(db database.Store, check *expects) {
// TODO: update this test once we have a specific role for notifications
check.Args(database.BulkMarkNotificationMessagesSentParams{}).Asserts(rbac.ResourceSystem, policy.ActionUpdate)
}))
s.Run("DeleteOldNotificationMessages", s.Subtest(func(db database.Store, check *expects) {
// TODO: update this test once we have a specific role for notifications
check.Args().Asserts(rbac.ResourceSystem, policy.ActionDelete)
}))
s.Run("EnqueueNotificationMessage", s.Subtest(func(db database.Store, check *expects) {
// TODO: update this test once we have a specific role for notifications
check.Args(database.EnqueueNotificationMessageParams{
Method: database.NotificationMethodWebhook,
}).Asserts(rbac.ResourceSystem, policy.ActionCreate)
}))
s.Run("FetchNewMessageMetadata", s.Subtest(func(db database.Store, check *expects) {
// TODO: update this test once we have a specific role for notifications
check.Args(database.FetchNewMessageMetadataParams{}).Asserts(rbac.ResourceSystem, policy.ActionRead)
}))
}
func (s *MethodTestSuite) TestOAuth2ProviderApps() {

View File

@ -907,6 +907,15 @@ func (*FakeQuerier) AcquireLock(_ context.Context, _ int64) error {
return xerrors.New("AcquireLock must only be called within a transaction")
}
func (*FakeQuerier) AcquireNotificationMessages(_ context.Context, arg database.AcquireNotificationMessagesParams) ([]database.AcquireNotificationMessagesRow, error) {
err := validateDatabaseType(arg)
if err != nil {
return nil, err
}
// nolint:nilnil // Irrelevant.
return nil, nil
}
func (q *FakeQuerier) AcquireProvisionerJob(_ context.Context, arg database.AcquireProvisionerJobParams) (database.ProvisionerJob, error) {
if err := validateDatabaseType(arg); err != nil {
return database.ProvisionerJob{}, err
@ -1169,6 +1178,22 @@ func (q *FakeQuerier) BatchUpdateWorkspaceLastUsedAt(_ context.Context, arg data
return nil
}
func (*FakeQuerier) BulkMarkNotificationMessagesFailed(_ context.Context, arg database.BulkMarkNotificationMessagesFailedParams) (int64, error) {
err := validateDatabaseType(arg)
if err != nil {
return 0, err
}
return -1, nil
}
func (*FakeQuerier) BulkMarkNotificationMessagesSent(_ context.Context, arg database.BulkMarkNotificationMessagesSentParams) (int64, error) {
err := validateDatabaseType(arg)
if err != nil {
return 0, err
}
return -1, nil
}
func (*FakeQuerier) CleanTailnetCoordinators(_ context.Context) error {
return ErrUnimplemented
}
@ -1504,6 +1529,10 @@ func (q *FakeQuerier) DeleteOAuth2ProviderAppTokensByAppAndUserID(_ context.Cont
return nil
}
func (*FakeQuerier) DeleteOldNotificationMessages(_ context.Context) error {
return nil
}
func (q *FakeQuerier) DeleteOldProvisionerDaemons(_ context.Context) error {
q.mutex.Lock()
defer q.mutex.Unlock()
@ -1737,6 +1766,14 @@ func (q *FakeQuerier) DeleteWorkspaceAgentPortSharesByTemplate(_ context.Context
return nil
}
func (*FakeQuerier) EnqueueNotificationMessage(_ context.Context, arg database.EnqueueNotificationMessageParams) (database.NotificationMessage, error) {
err := validateDatabaseType(arg)
if err != nil {
return database.NotificationMessage{}, err
}
return database.NotificationMessage{}, nil
}
func (q *FakeQuerier) FavoriteWorkspace(_ context.Context, arg uuid.UUID) error {
err := validateDatabaseType(arg)
if err != nil {
@ -1756,6 +1793,14 @@ func (q *FakeQuerier) FavoriteWorkspace(_ context.Context, arg uuid.UUID) error
return nil
}
func (*FakeQuerier) FetchNewMessageMetadata(_ context.Context, arg database.FetchNewMessageMetadataParams) (database.FetchNewMessageMetadataRow, error) {
err := validateDatabaseType(arg)
if err != nil {
return database.FetchNewMessageMetadataRow{}, err
}
return database.FetchNewMessageMetadataRow{}, nil
}
func (q *FakeQuerier) GetAPIKeyByID(_ context.Context, id string) (database.APIKey, error) {
q.mutex.RLock()
defer q.mutex.RUnlock()

View File

@ -88,6 +88,13 @@ func (m metricsStore) AcquireLock(ctx context.Context, pgAdvisoryXactLock int64)
return err
}
func (m metricsStore) AcquireNotificationMessages(ctx context.Context, arg database.AcquireNotificationMessagesParams) ([]database.AcquireNotificationMessagesRow, error) {
start := time.Now()
r0, r1 := m.s.AcquireNotificationMessages(ctx, arg)
m.queryLatencies.WithLabelValues("AcquireNotificationMessages").Observe(time.Since(start).Seconds())
return r0, r1
}
func (m metricsStore) AcquireProvisionerJob(ctx context.Context, arg database.AcquireProvisionerJobParams) (database.ProvisionerJob, error) {
start := time.Now()
provisionerJob, err := m.s.AcquireProvisionerJob(ctx, arg)
@ -123,6 +130,20 @@ func (m metricsStore) BatchUpdateWorkspaceLastUsedAt(ctx context.Context, arg da
return r0
}
func (m metricsStore) BulkMarkNotificationMessagesFailed(ctx context.Context, arg database.BulkMarkNotificationMessagesFailedParams) (int64, error) {
start := time.Now()
r0, r1 := m.s.BulkMarkNotificationMessagesFailed(ctx, arg)
m.queryLatencies.WithLabelValues("BulkMarkNotificationMessagesFailed").Observe(time.Since(start).Seconds())
return r0, r1
}
func (m metricsStore) BulkMarkNotificationMessagesSent(ctx context.Context, arg database.BulkMarkNotificationMessagesSentParams) (int64, error) {
start := time.Now()
r0, r1 := m.s.BulkMarkNotificationMessagesSent(ctx, arg)
m.queryLatencies.WithLabelValues("BulkMarkNotificationMessagesSent").Observe(time.Since(start).Seconds())
return r0, r1
}
func (m metricsStore) CleanTailnetCoordinators(ctx context.Context) error {
start := time.Now()
err := m.s.CleanTailnetCoordinators(ctx)
@ -263,6 +284,13 @@ func (m metricsStore) DeleteOAuth2ProviderAppTokensByAppAndUserID(ctx context.Co
return r0
}
func (m metricsStore) DeleteOldNotificationMessages(ctx context.Context) error {
start := time.Now()
r0 := m.s.DeleteOldNotificationMessages(ctx)
m.queryLatencies.WithLabelValues("DeleteOldNotificationMessages").Observe(time.Since(start).Seconds())
return r0
}
func (m metricsStore) DeleteOldProvisionerDaemons(ctx context.Context) error {
start := time.Now()
r0 := m.s.DeleteOldProvisionerDaemons(ctx)
@ -354,6 +382,13 @@ func (m metricsStore) DeleteWorkspaceAgentPortSharesByTemplate(ctx context.Conte
return r0
}
func (m metricsStore) EnqueueNotificationMessage(ctx context.Context, arg database.EnqueueNotificationMessageParams) (database.NotificationMessage, error) {
start := time.Now()
r0, r1 := m.s.EnqueueNotificationMessage(ctx, arg)
m.queryLatencies.WithLabelValues("EnqueueNotificationMessage").Observe(time.Since(start).Seconds())
return r0, r1
}
func (m metricsStore) FavoriteWorkspace(ctx context.Context, arg uuid.UUID) error {
start := time.Now()
r0 := m.s.FavoriteWorkspace(ctx, arg)
@ -361,6 +396,13 @@ func (m metricsStore) FavoriteWorkspace(ctx context.Context, arg uuid.UUID) erro
return r0
}
func (m metricsStore) FetchNewMessageMetadata(ctx context.Context, arg database.FetchNewMessageMetadataParams) (database.FetchNewMessageMetadataRow, error) {
start := time.Now()
r0, r1 := m.s.FetchNewMessageMetadata(ctx, arg)
m.queryLatencies.WithLabelValues("FetchNewMessageMetadata").Observe(time.Since(start).Seconds())
return r0, r1
}
func (m metricsStore) GetAPIKeyByID(ctx context.Context, id string) (database.APIKey, error) {
start := time.Now()
apiKey, err := m.s.GetAPIKeyByID(ctx, id)

View File

@ -58,6 +58,21 @@ func (mr *MockStoreMockRecorder) AcquireLock(arg0, arg1 any) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AcquireLock", reflect.TypeOf((*MockStore)(nil).AcquireLock), arg0, arg1)
}
// AcquireNotificationMessages mocks base method.
func (m *MockStore) AcquireNotificationMessages(arg0 context.Context, arg1 database.AcquireNotificationMessagesParams) ([]database.AcquireNotificationMessagesRow, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "AcquireNotificationMessages", arg0, arg1)
ret0, _ := ret[0].([]database.AcquireNotificationMessagesRow)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// AcquireNotificationMessages indicates an expected call of AcquireNotificationMessages.
func (mr *MockStoreMockRecorder) AcquireNotificationMessages(arg0, arg1 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AcquireNotificationMessages", reflect.TypeOf((*MockStore)(nil).AcquireNotificationMessages), arg0, arg1)
}
// AcquireProvisionerJob mocks base method.
func (m *MockStore) AcquireProvisionerJob(arg0 context.Context, arg1 database.AcquireProvisionerJobParams) (database.ProvisionerJob, error) {
m.ctrl.T.Helper()
@ -131,6 +146,36 @@ func (mr *MockStoreMockRecorder) BatchUpdateWorkspaceLastUsedAt(arg0, arg1 any)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BatchUpdateWorkspaceLastUsedAt", reflect.TypeOf((*MockStore)(nil).BatchUpdateWorkspaceLastUsedAt), arg0, arg1)
}
// BulkMarkNotificationMessagesFailed mocks base method.
func (m *MockStore) BulkMarkNotificationMessagesFailed(arg0 context.Context, arg1 database.BulkMarkNotificationMessagesFailedParams) (int64, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "BulkMarkNotificationMessagesFailed", arg0, arg1)
ret0, _ := ret[0].(int64)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// BulkMarkNotificationMessagesFailed indicates an expected call of BulkMarkNotificationMessagesFailed.
func (mr *MockStoreMockRecorder) BulkMarkNotificationMessagesFailed(arg0, arg1 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BulkMarkNotificationMessagesFailed", reflect.TypeOf((*MockStore)(nil).BulkMarkNotificationMessagesFailed), arg0, arg1)
}
// BulkMarkNotificationMessagesSent mocks base method.
func (m *MockStore) BulkMarkNotificationMessagesSent(arg0 context.Context, arg1 database.BulkMarkNotificationMessagesSentParams) (int64, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "BulkMarkNotificationMessagesSent", arg0, arg1)
ret0, _ := ret[0].(int64)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// BulkMarkNotificationMessagesSent indicates an expected call of BulkMarkNotificationMessagesSent.
func (mr *MockStoreMockRecorder) BulkMarkNotificationMessagesSent(arg0, arg1 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BulkMarkNotificationMessagesSent", reflect.TypeOf((*MockStore)(nil).BulkMarkNotificationMessagesSent), arg0, arg1)
}
// CleanTailnetCoordinators mocks base method.
func (m *MockStore) CleanTailnetCoordinators(arg0 context.Context) error {
m.ctrl.T.Helper()
@ -413,6 +458,20 @@ func (mr *MockStoreMockRecorder) DeleteOAuth2ProviderAppTokensByAppAndUserID(arg
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteOAuth2ProviderAppTokensByAppAndUserID", reflect.TypeOf((*MockStore)(nil).DeleteOAuth2ProviderAppTokensByAppAndUserID), arg0, arg1)
}
// DeleteOldNotificationMessages mocks base method.
func (m *MockStore) DeleteOldNotificationMessages(arg0 context.Context) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "DeleteOldNotificationMessages", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// DeleteOldNotificationMessages indicates an expected call of DeleteOldNotificationMessages.
func (mr *MockStoreMockRecorder) DeleteOldNotificationMessages(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteOldNotificationMessages", reflect.TypeOf((*MockStore)(nil).DeleteOldNotificationMessages), arg0)
}
// DeleteOldProvisionerDaemons mocks base method.
func (m *MockStore) DeleteOldProvisionerDaemons(arg0 context.Context) error {
m.ctrl.T.Helper()
@ -599,6 +658,21 @@ func (mr *MockStoreMockRecorder) DeleteWorkspaceAgentPortSharesByTemplate(arg0,
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteWorkspaceAgentPortSharesByTemplate", reflect.TypeOf((*MockStore)(nil).DeleteWorkspaceAgentPortSharesByTemplate), arg0, arg1)
}
// EnqueueNotificationMessage mocks base method.
func (m *MockStore) EnqueueNotificationMessage(arg0 context.Context, arg1 database.EnqueueNotificationMessageParams) (database.NotificationMessage, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "EnqueueNotificationMessage", arg0, arg1)
ret0, _ := ret[0].(database.NotificationMessage)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// EnqueueNotificationMessage indicates an expected call of EnqueueNotificationMessage.
func (mr *MockStoreMockRecorder) EnqueueNotificationMessage(arg0, arg1 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EnqueueNotificationMessage", reflect.TypeOf((*MockStore)(nil).EnqueueNotificationMessage), arg0, arg1)
}
// FavoriteWorkspace mocks base method.
func (m *MockStore) FavoriteWorkspace(arg0 context.Context, arg1 uuid.UUID) error {
m.ctrl.T.Helper()
@ -613,6 +687,21 @@ func (mr *MockStoreMockRecorder) FavoriteWorkspace(arg0, arg1 any) *gomock.Call
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FavoriteWorkspace", reflect.TypeOf((*MockStore)(nil).FavoriteWorkspace), arg0, arg1)
}
// FetchNewMessageMetadata mocks base method.
func (m *MockStore) FetchNewMessageMetadata(arg0 context.Context, arg1 database.FetchNewMessageMetadataParams) (database.FetchNewMessageMetadataRow, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "FetchNewMessageMetadata", arg0, arg1)
ret0, _ := ret[0].(database.FetchNewMessageMetadataRow)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// FetchNewMessageMetadata indicates an expected call of FetchNewMessageMetadata.
func (mr *MockStoreMockRecorder) FetchNewMessageMetadata(arg0, arg1 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchNewMessageMetadata", reflect.TypeOf((*MockStore)(nil).FetchNewMessageMetadata), arg0, arg1)
}
// GetAPIKeyByID mocks base method.
func (m *MockStore) GetAPIKeyByID(arg0 context.Context, arg1 string) (database.APIKey, error) {
m.ctrl.T.Helper()

View File

@ -58,6 +58,9 @@ func New(ctx context.Context, logger slog.Logger, db database.Store) io.Closer {
if err := tx.DeleteOldProvisionerDaemons(ctx); err != nil {
return xerrors.Errorf("failed to delete old provisioner daemons: %w", err)
}
if err := tx.DeleteOldNotificationMessages(ctx); err != nil {
return xerrors.Errorf("failed to delete old notification messages: %w", err)
}
logger.Info(ctx, "purged old database entries", slog.F("duration", time.Since(start)))

View File

@ -78,6 +78,20 @@ CREATE TYPE name_organization_pair AS (
organization_id uuid
);
CREATE TYPE notification_message_status AS ENUM (
'pending',
'leased',
'sent',
'permanent_failure',
'temporary_failure',
'unknown'
);
CREATE TYPE notification_method AS ENUM (
'smtp',
'webhook'
);
CREATE TYPE parameter_destination_scheme AS ENUM (
'none',
'environment_variable',
@ -534,6 +548,34 @@ CREATE SEQUENCE licenses_id_seq
ALTER SEQUENCE licenses_id_seq OWNED BY licenses.id;
CREATE TABLE notification_messages (
id uuid NOT NULL,
notification_template_id uuid NOT NULL,
user_id uuid NOT NULL,
method notification_method NOT NULL,
status notification_message_status DEFAULT 'pending'::notification_message_status NOT NULL,
status_reason text,
created_by text NOT NULL,
payload jsonb NOT NULL,
attempt_count integer DEFAULT 0,
targets uuid[],
created_at timestamp with time zone DEFAULT CURRENT_TIMESTAMP NOT NULL,
updated_at timestamp with time zone,
leased_until timestamp with time zone,
next_retry_after timestamp with time zone
);
CREATE TABLE notification_templates (
id uuid NOT NULL,
name text NOT NULL,
title_template text NOT NULL,
body_template text NOT NULL,
actions jsonb,
"group" text
);
COMMENT ON TABLE notification_templates IS 'Templates from which to create notification messages.';
CREATE TABLE oauth2_provider_app_codes (
id uuid NOT NULL,
created_at timestamp with time zone NOT NULL,
@ -1473,6 +1515,15 @@ ALTER TABLE ONLY licenses
ALTER TABLE ONLY licenses
ADD CONSTRAINT licenses_pkey PRIMARY KEY (id);
ALTER TABLE ONLY notification_messages
ADD CONSTRAINT notification_messages_pkey PRIMARY KEY (id);
ALTER TABLE ONLY notification_templates
ADD CONSTRAINT notification_templates_name_key UNIQUE (name);
ALTER TABLE ONLY notification_templates
ADD CONSTRAINT notification_templates_pkey PRIMARY KEY (id);
ALTER TABLE ONLY oauth2_provider_app_codes
ADD CONSTRAINT oauth2_provider_app_codes_pkey PRIMARY KEY (id);
@ -1652,6 +1703,8 @@ CREATE INDEX idx_custom_roles_id ON custom_roles USING btree (id);
CREATE UNIQUE INDEX idx_custom_roles_name_lower ON custom_roles USING btree (lower(name));
CREATE INDEX idx_notification_messages_status ON notification_messages USING btree (status);
CREATE INDEX idx_organization_member_organization_id_uuid ON organization_members USING btree (organization_id);
CREATE INDEX idx_organization_member_user_id_uuid ON organization_members USING btree (user_id);
@ -1769,6 +1822,12 @@ ALTER TABLE ONLY jfrog_xray_scans
ALTER TABLE ONLY jfrog_xray_scans
ADD CONSTRAINT jfrog_xray_scans_workspace_id_fkey FOREIGN KEY (workspace_id) REFERENCES workspaces(id) ON DELETE CASCADE;
ALTER TABLE ONLY notification_messages
ADD CONSTRAINT notification_messages_notification_template_id_fkey FOREIGN KEY (notification_template_id) REFERENCES notification_templates(id) ON DELETE CASCADE;
ALTER TABLE ONLY notification_messages
ADD CONSTRAINT notification_messages_user_id_fkey FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE;
ALTER TABLE ONLY oauth2_provider_app_codes
ADD CONSTRAINT oauth2_provider_app_codes_app_id_fkey FOREIGN KEY (app_id) REFERENCES oauth2_provider_apps(id) ON DELETE CASCADE;

View File

@ -15,6 +15,8 @@ const (
ForeignKeyGroupsOrganizationID ForeignKeyConstraint = "groups_organization_id_fkey" // ALTER TABLE ONLY groups ADD CONSTRAINT groups_organization_id_fkey FOREIGN KEY (organization_id) REFERENCES organizations(id) ON DELETE CASCADE;
ForeignKeyJfrogXrayScansAgentID ForeignKeyConstraint = "jfrog_xray_scans_agent_id_fkey" // ALTER TABLE ONLY jfrog_xray_scans ADD CONSTRAINT jfrog_xray_scans_agent_id_fkey FOREIGN KEY (agent_id) REFERENCES workspace_agents(id) ON DELETE CASCADE;
ForeignKeyJfrogXrayScansWorkspaceID ForeignKeyConstraint = "jfrog_xray_scans_workspace_id_fkey" // ALTER TABLE ONLY jfrog_xray_scans ADD CONSTRAINT jfrog_xray_scans_workspace_id_fkey FOREIGN KEY (workspace_id) REFERENCES workspaces(id) ON DELETE CASCADE;
ForeignKeyNotificationMessagesNotificationTemplateID ForeignKeyConstraint = "notification_messages_notification_template_id_fkey" // ALTER TABLE ONLY notification_messages ADD CONSTRAINT notification_messages_notification_template_id_fkey FOREIGN KEY (notification_template_id) REFERENCES notification_templates(id) ON DELETE CASCADE;
ForeignKeyNotificationMessagesUserID ForeignKeyConstraint = "notification_messages_user_id_fkey" // ALTER TABLE ONLY notification_messages ADD CONSTRAINT notification_messages_user_id_fkey FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE;
ForeignKeyOauth2ProviderAppCodesAppID ForeignKeyConstraint = "oauth2_provider_app_codes_app_id_fkey" // ALTER TABLE ONLY oauth2_provider_app_codes ADD CONSTRAINT oauth2_provider_app_codes_app_id_fkey FOREIGN KEY (app_id) REFERENCES oauth2_provider_apps(id) ON DELETE CASCADE;
ForeignKeyOauth2ProviderAppCodesUserID ForeignKeyConstraint = "oauth2_provider_app_codes_user_id_fkey" // ALTER TABLE ONLY oauth2_provider_app_codes ADD CONSTRAINT oauth2_provider_app_codes_user_id_fkey FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE;
ForeignKeyOauth2ProviderAppSecretsAppID ForeignKeyConstraint = "oauth2_provider_app_secrets_app_id_fkey" // ALTER TABLE ONLY oauth2_provider_app_secrets ADD CONSTRAINT oauth2_provider_app_secrets_app_id_fkey FOREIGN KEY (app_id) REFERENCES oauth2_provider_apps(id) ON DELETE CASCADE;

View File

@ -0,0 +1,4 @@
DROP TABLE IF EXISTS notification_messages;
DROP TABLE IF EXISTS notification_templates;
DROP TYPE IF EXISTS notification_method;
DROP TYPE IF EXISTS notification_message_status;

View File

@ -0,0 +1,65 @@
CREATE TYPE notification_message_status AS ENUM (
'pending',
'leased',
'sent',
'permanent_failure',
'temporary_failure',
'unknown'
);
CREATE TYPE notification_method AS ENUM (
'smtp',
'webhook'
);
CREATE TABLE notification_templates
(
id uuid NOT NULL,
name text NOT NULL,
title_template text NOT NULL,
body_template text NOT NULL,
actions jsonb,
"group" text,
PRIMARY KEY (id),
UNIQUE (name)
);
COMMENT ON TABLE notification_templates IS 'Templates from which to create notification messages.';
CREATE TABLE notification_messages
(
id uuid NOT NULL,
notification_template_id uuid NOT NULL,
user_id uuid NOT NULL,
method notification_method NOT NULL,
status notification_message_status NOT NULL DEFAULT 'pending'::notification_message_status,
status_reason text,
created_by text NOT NULL,
payload jsonb NOT NULL,
attempt_count int DEFAULT 0,
targets uuid[],
created_at timestamp with time zone NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at timestamp with time zone,
leased_until timestamp with time zone,
next_retry_after timestamp with time zone,
PRIMARY KEY (id),
FOREIGN KEY (notification_template_id) REFERENCES notification_templates (id) ON DELETE CASCADE,
FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE
);
CREATE INDEX idx_notification_messages_status ON notification_messages (status);
-- TODO: autogenerate constants which reference the UUIDs
INSERT INTO notification_templates (id, name, title_template, body_template, "group", actions)
VALUES ('f517da0b-cdc9-410f-ab89-a86107c420ed', 'Workspace Deleted', E'Workspace "{{.Labels.name}}" deleted',
E'Hi {{.UserName}}\n\nYour workspace **{{.Labels.name}}** was deleted.\nThe specified reason was "**{{.Labels.reason}}**".',
'Workspace Events', '[
{
"label": "View workspaces",
"url": "{{ base_url }}/workspaces"
},
{
"label": "View templates",
"url": "{{ base_url }}/templates"
}
]'::jsonb);

View File

@ -0,0 +1,21 @@
DO
$$
DECLARE
template text;
BEGIN
SELECT 'You successfully did {{.thing}}!' INTO template;
INSERT INTO notification_templates (id, name, title_template, body_template, "group")
VALUES ('a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11', 'A', template, template, 'Group 1'),
('b0eebc99-9c0b-4ef8-bb6d-6bb9bd380a12', 'B', template, template, 'Group 1'),
('c0eebc99-9c0b-4ef8-bb6d-6bb9bd380a13', 'C', template, template, 'Group 2');
INSERT INTO users(id, email, username, hashed_password, created_at, updated_at, status, rbac_roles, deleted)
VALUES ('fc1511ef-4fcf-4a3b-98a1-8df64160e35a', 'githubuser@coder.com', 'githubuser', '\x', '2022-11-02 13:05:21.445455+02', '2022-11-02 13:05:21.445455+02', 'active', '{}', false) ON CONFLICT DO NOTHING;
INSERT INTO notification_messages (id, notification_template_id, user_id, method, created_by, payload)
VALUES (
gen_random_uuid(), 'a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11', 'fc1511ef-4fcf-4a3b-98a1-8df64160e35a', 'smtp'::notification_method, 'test', '{}'
);
END
$$;

View File

@ -660,6 +660,134 @@ func AllLoginTypeValues() []LoginType {
}
}
type NotificationMessageStatus string
const (
NotificationMessageStatusPending NotificationMessageStatus = "pending"
NotificationMessageStatusLeased NotificationMessageStatus = "leased"
NotificationMessageStatusSent NotificationMessageStatus = "sent"
NotificationMessageStatusPermanentFailure NotificationMessageStatus = "permanent_failure"
NotificationMessageStatusTemporaryFailure NotificationMessageStatus = "temporary_failure"
NotificationMessageStatusUnknown NotificationMessageStatus = "unknown"
)
func (e *NotificationMessageStatus) Scan(src interface{}) error {
switch s := src.(type) {
case []byte:
*e = NotificationMessageStatus(s)
case string:
*e = NotificationMessageStatus(s)
default:
return fmt.Errorf("unsupported scan type for NotificationMessageStatus: %T", src)
}
return nil
}
type NullNotificationMessageStatus struct {
NotificationMessageStatus NotificationMessageStatus `json:"notification_message_status"`
Valid bool `json:"valid"` // Valid is true if NotificationMessageStatus is not NULL
}
// Scan implements the Scanner interface.
func (ns *NullNotificationMessageStatus) Scan(value interface{}) error {
if value == nil {
ns.NotificationMessageStatus, ns.Valid = "", false
return nil
}
ns.Valid = true
return ns.NotificationMessageStatus.Scan(value)
}
// Value implements the driver Valuer interface.
func (ns NullNotificationMessageStatus) Value() (driver.Value, error) {
if !ns.Valid {
return nil, nil
}
return string(ns.NotificationMessageStatus), nil
}
func (e NotificationMessageStatus) Valid() bool {
switch e {
case NotificationMessageStatusPending,
NotificationMessageStatusLeased,
NotificationMessageStatusSent,
NotificationMessageStatusPermanentFailure,
NotificationMessageStatusTemporaryFailure,
NotificationMessageStatusUnknown:
return true
}
return false
}
func AllNotificationMessageStatusValues() []NotificationMessageStatus {
return []NotificationMessageStatus{
NotificationMessageStatusPending,
NotificationMessageStatusLeased,
NotificationMessageStatusSent,
NotificationMessageStatusPermanentFailure,
NotificationMessageStatusTemporaryFailure,
NotificationMessageStatusUnknown,
}
}
type NotificationMethod string
const (
NotificationMethodSmtp NotificationMethod = "smtp"
NotificationMethodWebhook NotificationMethod = "webhook"
)
func (e *NotificationMethod) Scan(src interface{}) error {
switch s := src.(type) {
case []byte:
*e = NotificationMethod(s)
case string:
*e = NotificationMethod(s)
default:
return fmt.Errorf("unsupported scan type for NotificationMethod: %T", src)
}
return nil
}
type NullNotificationMethod struct {
NotificationMethod NotificationMethod `json:"notification_method"`
Valid bool `json:"valid"` // Valid is true if NotificationMethod is not NULL
}
// Scan implements the Scanner interface.
func (ns *NullNotificationMethod) Scan(value interface{}) error {
if value == nil {
ns.NotificationMethod, ns.Valid = "", false
return nil
}
ns.Valid = true
return ns.NotificationMethod.Scan(value)
}
// Value implements the driver Valuer interface.
func (ns NullNotificationMethod) Value() (driver.Value, error) {
if !ns.Valid {
return nil, nil
}
return string(ns.NotificationMethod), nil
}
func (e NotificationMethod) Valid() bool {
switch e {
case NotificationMethodSmtp,
NotificationMethodWebhook:
return true
}
return false
}
func AllNotificationMethodValues() []NotificationMethod {
return []NotificationMethod{
NotificationMethodSmtp,
NotificationMethodWebhook,
}
}
type ParameterDestinationScheme string
const (
@ -1885,6 +2013,33 @@ type License struct {
UUID uuid.UUID `db:"uuid" json:"uuid"`
}
type NotificationMessage struct {
ID uuid.UUID `db:"id" json:"id"`
NotificationTemplateID uuid.UUID `db:"notification_template_id" json:"notification_template_id"`
UserID uuid.UUID `db:"user_id" json:"user_id"`
Method NotificationMethod `db:"method" json:"method"`
Status NotificationMessageStatus `db:"status" json:"status"`
StatusReason sql.NullString `db:"status_reason" json:"status_reason"`
CreatedBy string `db:"created_by" json:"created_by"`
Payload []byte `db:"payload" json:"payload"`
AttemptCount sql.NullInt32 `db:"attempt_count" json:"attempt_count"`
Targets []uuid.UUID `db:"targets" json:"targets"`
CreatedAt time.Time `db:"created_at" json:"created_at"`
UpdatedAt sql.NullTime `db:"updated_at" json:"updated_at"`
LeasedUntil sql.NullTime `db:"leased_until" json:"leased_until"`
NextRetryAfter sql.NullTime `db:"next_retry_after" json:"next_retry_after"`
}
// Templates from which to create notification messages.
type NotificationTemplate struct {
ID uuid.UUID `db:"id" json:"id"`
Name string `db:"name" json:"name"`
TitleTemplate string `db:"title_template" json:"title_template"`
BodyTemplate string `db:"body_template" json:"body_template"`
Actions []byte `db:"actions" json:"actions"`
Group sql.NullString `db:"group" json:"group"`
}
// A table used to configure apps that can use Coder as an OAuth2 provider, the reverse of what we are calling external authentication.
type OAuth2ProviderApp struct {
ID uuid.UUID `db:"id" json:"id"`

View File

@ -17,6 +17,18 @@ type sqlcQuerier interface {
// This must be called from within a transaction. The lock will be automatically
// released when the transaction ends.
AcquireLock(ctx context.Context, pgAdvisoryXactLock int64) error
// Acquires the lease for a given count of notification messages, to enable concurrent dequeuing and subsequent sending.
// Only rows that aren't already leased (or ones which are leased but have exceeded their lease period) are returned.
//
// A "lease" here refers to a notifier taking ownership of a notification_messages row. A lease survives for the duration
// of CODER_NOTIFICATIONS_LEASE_PERIOD. Once a message is delivered, its status is updated and the lease expires (set to NULL).
// If a message exceeds its lease, that implies the notifier did not shutdown cleanly, or the table update failed somehow,
// and the row will then be eligible to be dequeued by another notifier.
//
// SKIP LOCKED is used to jump over locked rows. This prevents multiple notifiers from acquiring the same messages.
// See: https://www.postgresql.org/docs/9.5/sql-select.html#SQL-FOR-UPDATE-SHARE
//
AcquireNotificationMessages(ctx context.Context, arg AcquireNotificationMessagesParams) ([]AcquireNotificationMessagesRow, error)
// Acquires the lock for a single job that isn't started, completed,
// canceled, and that matches an array of provisioner types.
//
@ -45,6 +57,8 @@ type sqlcQuerier interface {
// referenced by the latest build of a workspace.
ArchiveUnusedTemplateVersions(ctx context.Context, arg ArchiveUnusedTemplateVersionsParams) ([]uuid.UUID, error)
BatchUpdateWorkspaceLastUsedAt(ctx context.Context, arg BatchUpdateWorkspaceLastUsedAtParams) error
BulkMarkNotificationMessagesFailed(ctx context.Context, arg BulkMarkNotificationMessagesFailedParams) (int64, error)
BulkMarkNotificationMessagesSent(ctx context.Context, arg BulkMarkNotificationMessagesSentParams) (int64, error)
CleanTailnetCoordinators(ctx context.Context) error
CleanTailnetLostPeers(ctx context.Context) error
CleanTailnetTunnels(ctx context.Context) error
@ -65,6 +79,8 @@ type sqlcQuerier interface {
DeleteOAuth2ProviderAppCodesByAppAndUserID(ctx context.Context, arg DeleteOAuth2ProviderAppCodesByAppAndUserIDParams) error
DeleteOAuth2ProviderAppSecretByID(ctx context.Context, id uuid.UUID) error
DeleteOAuth2ProviderAppTokensByAppAndUserID(ctx context.Context, arg DeleteOAuth2ProviderAppTokensByAppAndUserIDParams) error
// Delete all notification messages which have not been updated for over a week.
DeleteOldNotificationMessages(ctx context.Context) error
// Delete provisioner daemons that have been created at least a week ago
// and have not connected to coderd since a week.
// A provisioner daemon with "zeroed" last_seen_at column indicates possible
@ -84,7 +100,10 @@ type sqlcQuerier interface {
DeleteTailnetTunnel(ctx context.Context, arg DeleteTailnetTunnelParams) (DeleteTailnetTunnelRow, error)
DeleteWorkspaceAgentPortShare(ctx context.Context, arg DeleteWorkspaceAgentPortShareParams) error
DeleteWorkspaceAgentPortSharesByTemplate(ctx context.Context, templateID uuid.UUID) error
EnqueueNotificationMessage(ctx context.Context, arg EnqueueNotificationMessageParams) (NotificationMessage, error)
FavoriteWorkspace(ctx context.Context, id uuid.UUID) error
// This is used to build up the notification_message's JSON payload.
FetchNewMessageMetadata(ctx context.Context, arg FetchNewMessageMetadataParams) (FetchNewMessageMetadataRow, error)
GetAPIKeyByID(ctx context.Context, id string) (APIKey, error)
// there is no unique constraint on empty token names
GetAPIKeyByName(ctx context.Context, arg GetAPIKeyByNameParams) (APIKey, error)

View File

@ -3285,6 +3285,297 @@ func (q *sqlQuerier) TryAcquireLock(ctx context.Context, pgTryAdvisoryXactLock i
return pg_try_advisory_xact_lock, err
}
const acquireNotificationMessages = `-- name: AcquireNotificationMessages :many
WITH acquired AS (
UPDATE
notification_messages
SET updated_at = NOW(),
status = 'leased'::notification_message_status,
status_reason = 'Leased by notifier ' || $1::uuid,
leased_until = NOW() + CONCAT($2::int, ' seconds')::interval
WHERE id IN (SELECT nm.id
FROM notification_messages AS nm
WHERE (
(
-- message is in acquirable states
nm.status IN (
'pending'::notification_message_status,
'temporary_failure'::notification_message_status
)
)
-- or somehow the message was left in leased for longer than its lease period
OR (
nm.status = 'leased'::notification_message_status
AND nm.leased_until < NOW()
)
)
AND (
-- exclude all messages which have exceeded the max attempts; these will be purged later
nm.attempt_count IS NULL OR nm.attempt_count < $3::int
)
-- if set, do not retry until we've exceeded the wait time
AND (
CASE
WHEN nm.next_retry_after IS NOT NULL THEN nm.next_retry_after < NOW()
ELSE true
END
)
ORDER BY nm.created_at ASC
-- Ensure that multiple concurrent readers cannot retrieve the same rows
FOR UPDATE OF nm
SKIP LOCKED
LIMIT $4)
RETURNING id, notification_template_id, user_id, method, status, status_reason, created_by, payload, attempt_count, targets, created_at, updated_at, leased_until, next_retry_after)
SELECT
-- message
nm.id,
nm.payload,
nm.method,
nm.created_by,
-- template
nt.title_template,
nt.body_template
FROM acquired nm
JOIN notification_templates nt ON nm.notification_template_id = nt.id
`
type AcquireNotificationMessagesParams struct {
NotifierID uuid.UUID `db:"notifier_id" json:"notifier_id"`
LeaseSeconds int32 `db:"lease_seconds" json:"lease_seconds"`
MaxAttemptCount int32 `db:"max_attempt_count" json:"max_attempt_count"`
Count int32 `db:"count" json:"count"`
}
type AcquireNotificationMessagesRow struct {
ID uuid.UUID `db:"id" json:"id"`
Payload json.RawMessage `db:"payload" json:"payload"`
Method NotificationMethod `db:"method" json:"method"`
CreatedBy string `db:"created_by" json:"created_by"`
TitleTemplate string `db:"title_template" json:"title_template"`
BodyTemplate string `db:"body_template" json:"body_template"`
}
// Acquires the lease for a given count of notification messages, to enable concurrent dequeuing and subsequent sending.
// Only rows that aren't already leased (or ones which are leased but have exceeded their lease period) are returned.
//
// A "lease" here refers to a notifier taking ownership of a notification_messages row. A lease survives for the duration
// of CODER_NOTIFICATIONS_LEASE_PERIOD. Once a message is delivered, its status is updated and the lease expires (set to NULL).
// If a message exceeds its lease, that implies the notifier did not shutdown cleanly, or the table update failed somehow,
// and the row will then be eligible to be dequeued by another notifier.
//
// SKIP LOCKED is used to jump over locked rows. This prevents multiple notifiers from acquiring the same messages.
// See: https://www.postgresql.org/docs/9.5/sql-select.html#SQL-FOR-UPDATE-SHARE
func (q *sqlQuerier) AcquireNotificationMessages(ctx context.Context, arg AcquireNotificationMessagesParams) ([]AcquireNotificationMessagesRow, error) {
rows, err := q.db.QueryContext(ctx, acquireNotificationMessages,
arg.NotifierID,
arg.LeaseSeconds,
arg.MaxAttemptCount,
arg.Count,
)
if err != nil {
return nil, err
}
defer rows.Close()
var items []AcquireNotificationMessagesRow
for rows.Next() {
var i AcquireNotificationMessagesRow
if err := rows.Scan(
&i.ID,
&i.Payload,
&i.Method,
&i.CreatedBy,
&i.TitleTemplate,
&i.BodyTemplate,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Close(); err != nil {
return nil, err
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const bulkMarkNotificationMessagesFailed = `-- name: BulkMarkNotificationMessagesFailed :execrows
UPDATE notification_messages
SET updated_at = subquery.failed_at,
attempt_count = attempt_count + 1,
status = CASE
WHEN attempt_count + 1 < $1::int THEN subquery.status
ELSE 'permanent_failure'::notification_message_status END,
status_reason = subquery.status_reason,
leased_until = NULL,
next_retry_after = CASE
WHEN (attempt_count + 1 < $1::int)
THEN NOW() + CONCAT($2::int, ' seconds')::interval END
FROM (SELECT UNNEST($3::uuid[]) AS id,
UNNEST($4::timestamptz[]) AS failed_at,
UNNEST($5::notification_message_status[]) AS status,
UNNEST($6::text[]) AS status_reason) AS subquery
WHERE notification_messages.id = subquery.id
`
type BulkMarkNotificationMessagesFailedParams struct {
MaxAttempts int32 `db:"max_attempts" json:"max_attempts"`
RetryInterval int32 `db:"retry_interval" json:"retry_interval"`
IDs []uuid.UUID `db:"ids" json:"ids"`
FailedAts []time.Time `db:"failed_ats" json:"failed_ats"`
Statuses []NotificationMessageStatus `db:"statuses" json:"statuses"`
StatusReasons []string `db:"status_reasons" json:"status_reasons"`
}
func (q *sqlQuerier) BulkMarkNotificationMessagesFailed(ctx context.Context, arg BulkMarkNotificationMessagesFailedParams) (int64, error) {
result, err := q.db.ExecContext(ctx, bulkMarkNotificationMessagesFailed,
arg.MaxAttempts,
arg.RetryInterval,
pq.Array(arg.IDs),
pq.Array(arg.FailedAts),
pq.Array(arg.Statuses),
pq.Array(arg.StatusReasons),
)
if err != nil {
return 0, err
}
return result.RowsAffected()
}
const bulkMarkNotificationMessagesSent = `-- name: BulkMarkNotificationMessagesSent :execrows
UPDATE notification_messages
SET updated_at = new_values.sent_at,
attempt_count = attempt_count + 1,
status = 'sent'::notification_message_status,
status_reason = NULL,
leased_until = NULL,
next_retry_after = NULL
FROM (SELECT UNNEST($1::uuid[]) AS id,
UNNEST($2::timestamptz[]) AS sent_at)
AS new_values
WHERE notification_messages.id = new_values.id
`
type BulkMarkNotificationMessagesSentParams struct {
IDs []uuid.UUID `db:"ids" json:"ids"`
SentAts []time.Time `db:"sent_ats" json:"sent_ats"`
}
func (q *sqlQuerier) BulkMarkNotificationMessagesSent(ctx context.Context, arg BulkMarkNotificationMessagesSentParams) (int64, error) {
result, err := q.db.ExecContext(ctx, bulkMarkNotificationMessagesSent, pq.Array(arg.IDs), pq.Array(arg.SentAts))
if err != nil {
return 0, err
}
return result.RowsAffected()
}
const deleteOldNotificationMessages = `-- name: DeleteOldNotificationMessages :exec
DELETE
FROM notification_messages
WHERE id IN
(SELECT id
FROM notification_messages AS nested
WHERE nested.updated_at < NOW() - INTERVAL '7 days')
`
// Delete all notification messages which have not been updated for over a week.
func (q *sqlQuerier) DeleteOldNotificationMessages(ctx context.Context) error {
_, err := q.db.ExecContext(ctx, deleteOldNotificationMessages)
return err
}
const enqueueNotificationMessage = `-- name: EnqueueNotificationMessage :one
INSERT INTO notification_messages (id, notification_template_id, user_id, method, payload, targets, created_by)
VALUES ($1,
$2,
$3,
$4::notification_method,
$5::jsonb,
$6,
$7)
RETURNING id, notification_template_id, user_id, method, status, status_reason, created_by, payload, attempt_count, targets, created_at, updated_at, leased_until, next_retry_after
`
type EnqueueNotificationMessageParams struct {
ID uuid.UUID `db:"id" json:"id"`
NotificationTemplateID uuid.UUID `db:"notification_template_id" json:"notification_template_id"`
UserID uuid.UUID `db:"user_id" json:"user_id"`
Method NotificationMethod `db:"method" json:"method"`
Payload json.RawMessage `db:"payload" json:"payload"`
Targets []uuid.UUID `db:"targets" json:"targets"`
CreatedBy string `db:"created_by" json:"created_by"`
}
func (q *sqlQuerier) EnqueueNotificationMessage(ctx context.Context, arg EnqueueNotificationMessageParams) (NotificationMessage, error) {
row := q.db.QueryRowContext(ctx, enqueueNotificationMessage,
arg.ID,
arg.NotificationTemplateID,
arg.UserID,
arg.Method,
arg.Payload,
pq.Array(arg.Targets),
arg.CreatedBy,
)
var i NotificationMessage
err := row.Scan(
&i.ID,
&i.NotificationTemplateID,
&i.UserID,
&i.Method,
&i.Status,
&i.StatusReason,
&i.CreatedBy,
&i.Payload,
&i.AttemptCount,
pq.Array(&i.Targets),
&i.CreatedAt,
&i.UpdatedAt,
&i.LeasedUntil,
&i.NextRetryAfter,
)
return i, err
}
const fetchNewMessageMetadata = `-- name: FetchNewMessageMetadata :one
SELECT nt.name AS notification_name,
nt.actions AS actions,
u.id AS user_id,
u.email AS user_email,
COALESCE(NULLIF(u.name, ''), NULLIF(u.username, ''))::text AS user_name
FROM notification_templates nt,
users u
WHERE nt.id = $1
AND u.id = $2
`
type FetchNewMessageMetadataParams struct {
NotificationTemplateID uuid.UUID `db:"notification_template_id" json:"notification_template_id"`
UserID uuid.UUID `db:"user_id" json:"user_id"`
}
type FetchNewMessageMetadataRow struct {
NotificationName string `db:"notification_name" json:"notification_name"`
Actions []byte `db:"actions" json:"actions"`
UserID uuid.UUID `db:"user_id" json:"user_id"`
UserEmail string `db:"user_email" json:"user_email"`
UserName string `db:"user_name" json:"user_name"`
}
// This is used to build up the notification_message's JSON payload.
func (q *sqlQuerier) FetchNewMessageMetadata(ctx context.Context, arg FetchNewMessageMetadataParams) (FetchNewMessageMetadataRow, error) {
row := q.db.QueryRowContext(ctx, fetchNewMessageMetadata, arg.NotificationTemplateID, arg.UserID)
var i FetchNewMessageMetadataRow
err := row.Scan(
&i.NotificationName,
&i.Actions,
&i.UserID,
&i.UserEmail,
&i.UserName,
)
return i, err
}
const deleteOAuth2ProviderAppByID = `-- name: DeleteOAuth2ProviderAppByID :exec
DELETE FROM oauth2_provider_apps WHERE id = $1
`

View File

@ -0,0 +1,127 @@
-- name: FetchNewMessageMetadata :one
-- This is used to build up the notification_message's JSON payload.
SELECT nt.name AS notification_name,
nt.actions AS actions,
u.id AS user_id,
u.email AS user_email,
COALESCE(NULLIF(u.name, ''), NULLIF(u.username, ''))::text AS user_name
FROM notification_templates nt,
users u
WHERE nt.id = @notification_template_id
AND u.id = @user_id;
-- name: EnqueueNotificationMessage :one
INSERT INTO notification_messages (id, notification_template_id, user_id, method, payload, targets, created_by)
VALUES (@id,
@notification_template_id,
@user_id,
@method::notification_method,
@payload::jsonb,
@targets,
@created_by)
RETURNING *;
-- Acquires the lease for a given count of notification messages, to enable concurrent dequeuing and subsequent sending.
-- Only rows that aren't already leased (or ones which are leased but have exceeded their lease period) are returned.
--
-- A "lease" here refers to a notifier taking ownership of a notification_messages row. A lease survives for the duration
-- of CODER_NOTIFICATIONS_LEASE_PERIOD. Once a message is delivered, its status is updated and the lease expires (set to NULL).
-- If a message exceeds its lease, that implies the notifier did not shutdown cleanly, or the table update failed somehow,
-- and the row will then be eligible to be dequeued by another notifier.
--
-- SKIP LOCKED is used to jump over locked rows. This prevents multiple notifiers from acquiring the same messages.
-- See: https://www.postgresql.org/docs/9.5/sql-select.html#SQL-FOR-UPDATE-SHARE
--
-- name: AcquireNotificationMessages :many
WITH acquired AS (
UPDATE
notification_messages
SET updated_at = NOW(),
status = 'leased'::notification_message_status,
status_reason = 'Leased by notifier ' || sqlc.arg('notifier_id')::uuid,
leased_until = NOW() + CONCAT(sqlc.arg('lease_seconds')::int, ' seconds')::interval
WHERE id IN (SELECT nm.id
FROM notification_messages AS nm
WHERE (
(
-- message is in acquirable states
nm.status IN (
'pending'::notification_message_status,
'temporary_failure'::notification_message_status
)
)
-- or somehow the message was left in leased for longer than its lease period
OR (
nm.status = 'leased'::notification_message_status
AND nm.leased_until < NOW()
)
)
AND (
-- exclude all messages which have exceeded the max attempts; these will be purged later
nm.attempt_count IS NULL OR nm.attempt_count < sqlc.arg('max_attempt_count')::int
)
-- if set, do not retry until we've exceeded the wait time
AND (
CASE
WHEN nm.next_retry_after IS NOT NULL THEN nm.next_retry_after < NOW()
ELSE true
END
)
ORDER BY nm.created_at ASC
-- Ensure that multiple concurrent readers cannot retrieve the same rows
FOR UPDATE OF nm
SKIP LOCKED
LIMIT sqlc.arg('count'))
RETURNING *)
SELECT
-- message
nm.id,
nm.payload,
nm.method,
nm.created_by,
-- template
nt.title_template,
nt.body_template
FROM acquired nm
JOIN notification_templates nt ON nm.notification_template_id = nt.id;
-- name: BulkMarkNotificationMessagesFailed :execrows
UPDATE notification_messages
SET updated_at = subquery.failed_at,
attempt_count = attempt_count + 1,
status = CASE
WHEN attempt_count + 1 < @max_attempts::int THEN subquery.status
ELSE 'permanent_failure'::notification_message_status END,
status_reason = subquery.status_reason,
leased_until = NULL,
next_retry_after = CASE
WHEN (attempt_count + 1 < @max_attempts::int)
THEN NOW() + CONCAT(@retry_interval::int, ' seconds')::interval END
FROM (SELECT UNNEST(@ids::uuid[]) AS id,
UNNEST(@failed_ats::timestamptz[]) AS failed_at,
UNNEST(@statuses::notification_message_status[]) AS status,
UNNEST(@status_reasons::text[]) AS status_reason) AS subquery
WHERE notification_messages.id = subquery.id;
-- name: BulkMarkNotificationMessagesSent :execrows
UPDATE notification_messages
SET updated_at = new_values.sent_at,
attempt_count = attempt_count + 1,
status = 'sent'::notification_message_status,
status_reason = NULL,
leased_until = NULL,
next_retry_after = NULL
FROM (SELECT UNNEST(@ids::uuid[]) AS id,
UNNEST(@sent_ats::timestamptz[]) AS sent_at)
AS new_values
WHERE notification_messages.id = new_values.id;
-- Delete all notification messages which have not been updated for over a week.
-- name: DeleteOldNotificationMessages :exec
DELETE
FROM notification_messages
WHERE id IN
(SELECT id
FROM notification_messages AS nested
WHERE nested.updated_at < NOW() - INTERVAL '7 days');

View File

@ -64,6 +64,12 @@ sql:
- column: "template_usage_stats.app_usage_mins"
go_type:
type: "StringMapOfInt"
- column: "notification_templates.actions"
go_type:
type: "[]byte"
- column: "notification_messages.payload"
go_type:
type: "[]byte"
rename:
template: TemplateTable
template_with_user: Template

View File

@ -23,6 +23,9 @@ const (
UniqueJfrogXrayScansPkey UniqueConstraint = "jfrog_xray_scans_pkey" // ALTER TABLE ONLY jfrog_xray_scans ADD CONSTRAINT jfrog_xray_scans_pkey PRIMARY KEY (agent_id, workspace_id);
UniqueLicensesJWTKey UniqueConstraint = "licenses_jwt_key" // ALTER TABLE ONLY licenses ADD CONSTRAINT licenses_jwt_key UNIQUE (jwt);
UniqueLicensesPkey UniqueConstraint = "licenses_pkey" // ALTER TABLE ONLY licenses ADD CONSTRAINT licenses_pkey PRIMARY KEY (id);
UniqueNotificationMessagesPkey UniqueConstraint = "notification_messages_pkey" // ALTER TABLE ONLY notification_messages ADD CONSTRAINT notification_messages_pkey PRIMARY KEY (id);
UniqueNotificationTemplatesNameKey UniqueConstraint = "notification_templates_name_key" // ALTER TABLE ONLY notification_templates ADD CONSTRAINT notification_templates_name_key UNIQUE (name);
UniqueNotificationTemplatesPkey UniqueConstraint = "notification_templates_pkey" // ALTER TABLE ONLY notification_templates ADD CONSTRAINT notification_templates_pkey PRIMARY KEY (id);
UniqueOauth2ProviderAppCodesPkey UniqueConstraint = "oauth2_provider_app_codes_pkey" // ALTER TABLE ONLY oauth2_provider_app_codes ADD CONSTRAINT oauth2_provider_app_codes_pkey PRIMARY KEY (id);
UniqueOauth2ProviderAppCodesSecretPrefixKey UniqueConstraint = "oauth2_provider_app_codes_secret_prefix_key" // ALTER TABLE ONLY oauth2_provider_app_codes ADD CONSTRAINT oauth2_provider_app_codes_secret_prefix_key UNIQUE (secret_prefix);
UniqueOauth2ProviderAppSecretsPkey UniqueConstraint = "oauth2_provider_app_secrets_pkey" // ALTER TABLE ONLY oauth2_provider_app_secrets ADD CONSTRAINT oauth2_provider_app_secrets_pkey PRIMARY KEY (id);