feat: close preprod observability loop
This commit is contained in:
@@ -48,6 +48,7 @@ internal static class ObservabilityRegistration
|
||||
}
|
||||
|
||||
builder.Services.AddSingleton<SocializeMetrics>();
|
||||
builder.Services.AddHostedService<WorkflowHealthSamplerService>();
|
||||
builder.Services
|
||||
.AddOpenTelemetry()
|
||||
.ConfigureResource(resource => resource.AddService(
|
||||
|
||||
@@ -19,6 +19,8 @@ internal sealed class SocializeMetrics : IDisposable
|
||||
private readonly Counter<long> _organizationCreatedCounter;
|
||||
private readonly Counter<long> _workspaceCreatedCounter;
|
||||
private readonly Counter<long> _workspaceInviteCreatedCounter;
|
||||
private readonly object _workflowHealthLock = new();
|
||||
private WorkflowHealthSnapshot _workflowHealthSnapshot = WorkflowHealthSnapshot.Empty;
|
||||
|
||||
public SocializeMetrics()
|
||||
{
|
||||
@@ -58,6 +60,27 @@ internal sealed class SocializeMetrics : IDisposable
|
||||
_backgroundJobRunCounter = Meter.CreateCounter<long>(
|
||||
"socialize.background_job.runs",
|
||||
description: "Background job runs partitioned by job and outcome.");
|
||||
|
||||
Meter.CreateObservableGauge(
|
||||
"socialize.workflow.content_items",
|
||||
ObserveContentItemCounts,
|
||||
description: "Current content item counts by status.");
|
||||
Meter.CreateObservableGauge(
|
||||
"socialize.workflow.feedback_reports",
|
||||
ObserveFeedbackReportCounts,
|
||||
description: "Current feedback report counts by status.");
|
||||
Meter.CreateObservableGauge(
|
||||
"socialize.workflow.pending_invites",
|
||||
ObservePendingInviteCount,
|
||||
description: "Current pending workspace invite count.");
|
||||
Meter.CreateObservableGauge(
|
||||
"socialize.workflow.stale_in_approval",
|
||||
ObserveStaleApprovalCount,
|
||||
description: "Current count of content items in approval longer than the configured stale threshold.");
|
||||
Meter.CreateObservableGauge(
|
||||
"socialize.workflow.active_workspaces",
|
||||
ObserveActiveWorkspaceCounts,
|
||||
description: "Current active workspace counts by observation window.");
|
||||
}
|
||||
|
||||
public Meter Meter { get; }
|
||||
@@ -150,9 +173,86 @@ internal sealed class SocializeMetrics : IDisposable
|
||||
new KeyValuePair<string, object?>("outcome", succeeded ? "success" : "failure"));
|
||||
}
|
||||
|
||||
public void UpdateWorkflowHealth(WorkflowHealthSnapshot snapshot)
|
||||
{
|
||||
lock (_workflowHealthLock)
|
||||
{
|
||||
_workflowHealthSnapshot = snapshot;
|
||||
}
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
Meter.Dispose();
|
||||
ActivitySource.Dispose();
|
||||
}
|
||||
|
||||
private Measurement<int>[] ObserveContentItemCounts()
|
||||
{
|
||||
WorkflowHealthSnapshot snapshot = GetWorkflowHealthSnapshot();
|
||||
return snapshot.ContentItemsByStatus
|
||||
.Select(pair => new Measurement<int>(
|
||||
pair.Value,
|
||||
new KeyValuePair<string, object?>("status", pair.Key)))
|
||||
.ToArray();
|
||||
}
|
||||
|
||||
private Measurement<int>[] ObserveFeedbackReportCounts()
|
||||
{
|
||||
WorkflowHealthSnapshot snapshot = GetWorkflowHealthSnapshot();
|
||||
return snapshot.FeedbackReportsByStatus
|
||||
.Select(pair => new Measurement<int>(
|
||||
pair.Value,
|
||||
new KeyValuePair<string, object?>("status", pair.Key)))
|
||||
.ToArray();
|
||||
}
|
||||
|
||||
private Measurement<int> ObservePendingInviteCount()
|
||||
{
|
||||
return new Measurement<int>(GetWorkflowHealthSnapshot().PendingInviteCount);
|
||||
}
|
||||
|
||||
private Measurement<int> ObserveStaleApprovalCount()
|
||||
{
|
||||
return new Measurement<int>(GetWorkflowHealthSnapshot().StaleInApprovalCount);
|
||||
}
|
||||
|
||||
private Measurement<int>[] ObserveActiveWorkspaceCounts()
|
||||
{
|
||||
WorkflowHealthSnapshot snapshot = GetWorkflowHealthSnapshot();
|
||||
return
|
||||
[
|
||||
new Measurement<int>(
|
||||
snapshot.ActiveWorkspaces24Hours,
|
||||
new KeyValuePair<string, object?>("window", "24h")),
|
||||
new Measurement<int>(
|
||||
snapshot.ActiveWorkspaces7Days,
|
||||
new KeyValuePair<string, object?>("window", "7d")),
|
||||
];
|
||||
}
|
||||
|
||||
private WorkflowHealthSnapshot GetWorkflowHealthSnapshot()
|
||||
{
|
||||
lock (_workflowHealthLock)
|
||||
{
|
||||
return _workflowHealthSnapshot;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
internal sealed record WorkflowHealthSnapshot(
|
||||
IReadOnlyDictionary<string, int> ContentItemsByStatus,
|
||||
IReadOnlyDictionary<string, int> FeedbackReportsByStatus,
|
||||
int PendingInviteCount,
|
||||
int StaleInApprovalCount,
|
||||
int ActiveWorkspaces24Hours,
|
||||
int ActiveWorkspaces7Days)
|
||||
{
|
||||
public static WorkflowHealthSnapshot Empty { get; } = new(
|
||||
new Dictionary<string, int>(StringComparer.Ordinal),
|
||||
new Dictionary<string, int>(StringComparer.Ordinal),
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0);
|
||||
}
|
||||
|
||||
@@ -0,0 +1,102 @@
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Socialize.Api.Data;
|
||||
using Socialize.Api.Modules.Feedback.Data;
|
||||
using Socialize.Api.Modules.Workspaces.Data;
|
||||
|
||||
namespace Socialize.Api.Infrastructure.Observability;
|
||||
|
||||
internal sealed class WorkflowHealthSamplerService(
|
||||
IServiceScopeFactory scopeFactory,
|
||||
SocializeMetrics metrics,
|
||||
ILogger<WorkflowHealthSamplerService> logger)
|
||||
: BackgroundService
|
||||
{
|
||||
private static readonly TimeSpan SampleInterval = TimeSpan.FromMinutes(5);
|
||||
private static readonly TimeSpan StaleApprovalThreshold = TimeSpan.FromDays(3);
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
await SampleAsync(stoppingToken);
|
||||
|
||||
using PeriodicTimer timer = new(SampleInterval);
|
||||
while (!stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
await timer.WaitForNextTickAsync(stoppingToken);
|
||||
await SampleAsync(stoppingToken);
|
||||
}
|
||||
catch (OperationCanceledException ex) when (stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
logger.LogDebug(ex, "Workflow health sampler stopped.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task SampleAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
try
|
||||
{
|
||||
using IServiceScope scope = scopeFactory.CreateScope();
|
||||
AppDbContext dbContext = scope.ServiceProvider.GetRequiredService<AppDbContext>();
|
||||
DateTimeOffset now = DateTimeOffset.UtcNow;
|
||||
DateTimeOffset staleApprovalCutoff = now.Subtract(StaleApprovalThreshold);
|
||||
DateTimeOffset active24HourCutoff = now.AddHours(-24);
|
||||
DateTimeOffset active7DayCutoff = now.AddDays(-7);
|
||||
|
||||
Dictionary<string, int> contentItemsByStatus = await dbContext.ContentItems
|
||||
.GroupBy(item => item.Status)
|
||||
.Select(group => new { Status = group.Key, Count = group.Count() })
|
||||
.ToDictionaryAsync(group => group.Status, group => group.Count, StringComparer.Ordinal, stoppingToken);
|
||||
|
||||
Dictionary<string, int> feedbackReportsByStatus = await dbContext.FeedbackReports
|
||||
.GroupBy(report => report.Status)
|
||||
.Select(group => new { Status = group.Key, Count = group.Count() })
|
||||
.ToDictionaryAsync(
|
||||
group => group.Status == FeedbackStatus.WontDo ? "WontDo" : group.Status.ToString(),
|
||||
group => group.Count,
|
||||
StringComparer.Ordinal,
|
||||
stoppingToken);
|
||||
|
||||
int pendingInviteCount = await dbContext.WorkspaceInvites
|
||||
.CountAsync(invite => invite.Status == WorkspaceInviteStatuses.Pending, stoppingToken);
|
||||
|
||||
int staleInApprovalCount = await dbContext.ContentItems
|
||||
.CountAsync(
|
||||
item => item.Status == "In approval" && item.CreatedAt <= staleApprovalCutoff,
|
||||
stoppingToken);
|
||||
|
||||
int activeWorkspaces24Hours = await dbContext.ContentItemActivityEntries
|
||||
.Where(entry => entry.CreatedAt >= active24HourCutoff)
|
||||
.Select(entry => entry.WorkspaceId)
|
||||
.Distinct()
|
||||
.CountAsync(stoppingToken);
|
||||
|
||||
int activeWorkspaces7Days = await dbContext.ContentItemActivityEntries
|
||||
.Where(entry => entry.CreatedAt >= active7DayCutoff)
|
||||
.Select(entry => entry.WorkspaceId)
|
||||
.Distinct()
|
||||
.CountAsync(stoppingToken);
|
||||
|
||||
metrics.UpdateWorkflowHealth(new WorkflowHealthSnapshot(
|
||||
contentItemsByStatus,
|
||||
feedbackReportsByStatus,
|
||||
pendingInviteCount,
|
||||
staleInApprovalCount,
|
||||
activeWorkspaces24Hours,
|
||||
activeWorkspaces7Days));
|
||||
metrics.RecordBackgroundJobRun(nameof(WorkflowHealthSamplerService), true);
|
||||
}
|
||||
catch (OperationCanceledException ex) when (stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
logger.LogDebug(ex, "Workflow health sampler stopped.");
|
||||
}
|
||||
#pragma warning disable CA1031
|
||||
catch (Exception ex)
|
||||
{
|
||||
metrics.RecordBackgroundJobRun(nameof(WorkflowHealthSamplerService), false);
|
||||
logger.LogError(ex, "Workflow health sampling failed.");
|
||||
}
|
||||
#pragma warning restore CA1031
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user