Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions enterprise/server/usage/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ go_library(
"//server/metrics",
"//server/real_environment",
"//server/tables",
"//server/usage/sku",
"//server/util/alert",
"//server/util/authutil",
"//server/util/clickhouse/schema",
"//server/util/db",
"//server/util/log",
"//server/util/status",
Expand All @@ -39,7 +41,54 @@ go_test(
"//server/interfaces",
"//server/tables",
"//server/testutil/testauth",
"//server/testutil/testclickhouse",
"//server/testutil/testenv",
"//server/usage/sku",
"//server/util/clickhouse",
"//server/util/clickhouse/schema",
"//server/util/db",
"//server/util/testing/flags",
"@com_github_go_redis_redis_v8//:redis",
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_jonboulle_clockwork//:clockwork",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@io_gorm_gorm//:gorm",
"@org_golang_x_sync//errgroup",
],
)

go_test(
name = "usage_test_clickhouse",
size = "small",
srcs = ["usage_test.go"],
args = [
"-test_clickhouse_enabled=true",
# TODO(bduffany): Remove test filter, add ClickHouse test logic to all
# test cases, and enable test sharding before we fully enable
# ClickHouse in prod.
"-test.run=TestUsageTracker_Increment_MultipleGroupsInSameCollectionPeriod",
],
exec_properties = {
"test.workload-isolation-type": "firecracker",
"test.init-dockerd": "true",
"test.recycle-runner": "true",
"test.runner-recycling-key": "clickhouse:25.3",
},
tags = ["docker"],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this tag do?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It indicates that the test requires docker to be available in the execution environment. The effect is that the test is excluded when running bazel test //... locally.

deps = [
":usage",
"//enterprise/server/backends/redis_metrics_collector",
"//enterprise/server/testutil/testredis",
"//enterprise/server/util/redisutil",
"//server/interfaces",
"//server/tables",
"//server/testutil/testauth",
"//server/testutil/testclickhouse",
"//server/testutil/testenv",
"//server/usage/sku",
"//server/util/clickhouse",
"//server/util/clickhouse/schema",
"//server/util/db",
"//server/util/testing/flags",
"@com_github_go_redis_redis_v8//:redis",
Expand Down Expand Up @@ -78,7 +127,11 @@ go_test(
"//server/interfaces",
"//server/tables",
"//server/testutil/testauth",
"//server/testutil/testclickhouse",
"//server/testutil/testenv",
"//server/usage/sku",
"//server/util/clickhouse",
"//server/util/clickhouse/schema",
"//server/util/db",
"//server/util/testing/flags",
"@com_github_go_redis_redis_v8//:redis",
Expand Down Expand Up @@ -117,7 +170,11 @@ go_test(
"//server/interfaces",
"//server/tables",
"//server/testutil/testauth",
"//server/testutil/testclickhouse",
"//server/testutil/testenv",
"//server/usage/sku",
"//server/util/clickhouse",
"//server/util/clickhouse/schema",
"//server/util/db",
"//server/util/testing/flags",
"@com_github_go_redis_redis_v8//:redis",
Expand Down
163 changes: 160 additions & 3 deletions enterprise/server/usage/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/buildbuddy-io/buildbuddy/server/metrics"
"github.com/buildbuddy-io/buildbuddy/server/real_environment"
"github.com/buildbuddy-io/buildbuddy/server/tables"
"github.com/buildbuddy-io/buildbuddy/server/usage/sku"
"github.com/buildbuddy-io/buildbuddy/server/util/alert"
"github.com/buildbuddy-io/buildbuddy/server/util/authutil"
"github.com/buildbuddy-io/buildbuddy/server/util/db"
Expand All @@ -26,10 +27,12 @@ import (
"github.com/prometheus/client_golang/prometheus"

usage_config "github.com/buildbuddy-io/buildbuddy/enterprise/server/usage/config"
olaptables "github.com/buildbuddy-io/buildbuddy/server/util/clickhouse/schema"
)

var (
region = flag.String("app.region", "", "The region in which the app is running.")
region = flag.String("app.region", "", "The region in which the app is running.")
writeToOLAP = flag.Bool("app.write_usage_to_olap_db", false, "If true, write usage data to OLAP DB in addition to the primary DB write.")
)

const (
Expand Down Expand Up @@ -117,6 +120,8 @@ type tracker struct {
rdb redis.UniversalClient
clock clockwork.Clock
region string
// See docs for [olaptables.RawUsage.BufferID]
bufferID string

flushLock interfaces.DistributedLock
stopFlush chan struct{}
Expand Down Expand Up @@ -160,6 +165,7 @@ func NewTracker(env environment.Env, clock clockwork.Clock, flushLock interfaces
clock: clock,
flushLock: flushLock,
stopFlush: make(chan struct{}),
bufferID: fmt.Sprintf("%s:redis", *region),
}, nil
}

Expand Down Expand Up @@ -301,6 +307,8 @@ func (ut *tracker) flushToDB(ctx context.Context) error {
ctx, cancel = context.WithDeadline(ctx, deadline.Add(-5*time.Second))
defer cancel()

var olapRows []*olaptables.RawUsage

// Loop through usage periods starting from the oldest period
// that may exist in Redis (based on key expiration time) and looping up until
// we hit a period which is not yet "settled".
Expand Down Expand Up @@ -353,9 +361,19 @@ func (ut *tracker) flushToDB(ctx context.Context) error {
return err
}
// Update counts in the DB
if err := ut.flushCounts(ctx, collection.GroupID, p, collection.UsageLabels(), counts); err != nil {
groupID := collection.GroupID
labels := collection.UsageLabels()
if err := ut.flushCountsToPrimaryDB(ctx, groupID, p, labels, counts); err != nil {
return err
}
// If OLAP writes are enabled, accumulate OLAP rows.
if *writeToOLAP {
olapRows = append(
olapRows,
toOLAPRows(ut.bufferID, groupID, p, labels, counts)...,
)
}

// Remove the collection data from Redis now that it has been
// flushed to the DB.
pipe := ut.rdb.TxPipeline()
Expand All @@ -366,10 +384,18 @@ func (ut *tracker) flushToDB(ctx context.Context) error {
}
}
}

// Flush a accumulated OLAP rows.
if len(olapRows) > 0 {
if err := ut.env.GetOLAPDBHandle().FlushUsages(ctx, olapRows); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you think about adding a check that GetOLAPDBHandle() isn't nil, so that we can return an error instead of panicking?

return status.WrapError(err, "flush OLAP usage records")
}
}

return nil
}

func (ut *tracker) flushCounts(ctx context.Context, groupID string, p period, labels *tables.UsageLabels, counts *tables.UsageCounts) error {
func (ut *tracker) flushCountsToPrimaryDB(ctx context.Context, groupID string, p period, labels *tables.UsageLabels, counts *tables.UsageCounts) error {
dbh := ut.env.GetDBHandle()
return dbh.Transaction(ctx, func(tx interfaces.DB) error {
tu := &tables.Usage{
Expand Down Expand Up @@ -593,3 +619,134 @@ func stringMapToCounts(h map[string]string) (*tables.UsageCounts, error) {
MemoryGBUsec: hInt64["memory_gb_usec"],
}, nil
}

// toLabelMap converts primary DB labels to new OLAP labels.
func toLabelMap(labels *tables.UsageLabels) map[sku.LabelName]sku.LabelValue {
mapSize := 0
if labels.Origin != "" {
mapSize++
}
if labels.Client != "" {
mapSize++
}
if labels.Server != "" {
mapSize++
}
m := make(map[sku.LabelName]sku.LabelValue, mapSize)
if labels.Origin != "" {
m[sku.Origin] = sku.LabelValue(labels.Origin)
}
if labels.Client != "" {
m[sku.Client] = sku.LabelValue(labels.Client)
}
if labels.Server != "" {
m[sku.Server] = sku.LabelValue(labels.Server)
}
return m
}

// toOLAPRows converts primary DB usage rows to OLAP rows.
// TODO(bduffany): once we've fully turned on OLAP-based usage, we can delete
// this and use the OLAP schema directly.
func toOLAPRows(bufferID, groupID string, p period, labels *tables.UsageLabels, counts *tables.UsageCounts) []*olaptables.RawUsage {
var rows []*olaptables.RawUsage
baseRow := olaptables.RawUsage{
GroupID: groupID,
Labels: toLabelMap(labels),
PeriodStart: p.Start(),
BufferID: bufferID,
}
if counts.ActionCacheHits > 0 {
row := baseRow // copy
row.SKU = sku.RemoteCacheACHits
row.Count = counts.ActionCacheHits
rows = append(rows, &row)
}
if counts.CASCacheHits > 0 {
row := baseRow // copy
row.SKU = sku.RemoteCacheCASHits
row.Count = counts.CASCacheHits
rows = append(rows, &row)
}
if counts.Invocations > 0 {
row := baseRow // copy
row.SKU = sku.BuildEventsBESCount
row.Count = counts.Invocations
rows = append(rows, &row)
}
if counts.LinuxExecutionDurationUsec > 0 {
row := baseRow // copy
row.SKU = sku.RemoteExecutionExecuteWorkerDurationNanos
row.Count = counts.LinuxExecutionDurationUsec
row.Labels = appendExecutionLabels(row.Labels, sku.OSLinux, sku.SelfHostedFalse)
rows = append(rows, &row)
}
if counts.MacExecutionDurationUsec > 0 {
row := baseRow // copy
row.SKU = sku.RemoteExecutionExecuteWorkerDurationNanos
row.Count = counts.MacExecutionDurationUsec
row.Labels = appendExecutionLabels(row.Labels, sku.OSMac, sku.SelfHostedFalse)
rows = append(rows, &row)
}
if counts.SelfHostedLinuxExecutionDurationUsec > 0 {
row := baseRow // copy
row.SKU = sku.RemoteExecutionExecuteWorkerDurationNanos
row.Count = counts.SelfHostedLinuxExecutionDurationUsec
row.Labels = appendExecutionLabels(row.Labels, sku.OSLinux, sku.SelfHostedTrue)
rows = append(rows, &row)
}
if counts.SelfHostedMacExecutionDurationUsec > 0 {
row := baseRow // copy
row.SKU = sku.RemoteExecutionExecuteWorkerDurationNanos
row.Count = counts.SelfHostedMacExecutionDurationUsec
row.Labels = appendExecutionLabels(row.Labels, sku.OSMac, sku.SelfHostedTrue)
rows = append(rows, &row)
}
if counts.TotalDownloadSizeBytes > 0 {
row := baseRow // copy
row.SKU = sku.RemoteCacheCASDownloadedBytes
row.Count = counts.TotalDownloadSizeBytes
rows = append(rows, &row)
}
if counts.TotalUploadSizeBytes > 0 {
row := baseRow // copy
row.SKU = sku.RemoteCacheCASUploadedBytes
row.Count = counts.TotalUploadSizeBytes
rows = append(rows, &row)
}
if counts.TotalCachedActionExecUsec > 0 {
row := baseRow // copy
row.SKU = sku.RemoteExecutionExecuteWorkerDurationNanos
row.Count = counts.TotalCachedActionExecUsec
rows = append(rows, &row)
}
if counts.CPUNanos > 0 {
row := baseRow // copy
row.SKU = sku.RemoteExecutionExecuteWorkerCPUNanos
row.Count = counts.CPUNanos
// NOTE: we currently only report CPU usage for cloud Linux executors.
row.Labels = appendExecutionLabels(row.Labels, sku.OSLinux, sku.SelfHostedFalse)
rows = append(rows, &row)
}
if counts.MemoryGBUsec > 0 {
row := baseRow // copy
row.SKU = sku.RemoteExecutionExecuteWorkerMemoryGBNanos
row.Count = counts.MemoryGBUsec
// NOTE: we currently only report memory usage for cloud Linux executors.
row.Labels = appendExecutionLabels(row.Labels, sku.OSLinux, sku.SelfHostedFalse)
rows = append(rows, &row)
}
return rows
}

// appendExecutionLabels returns a new map which is a clone of the given map
// with the given OS and self-hosted labels applied.
func appendExecutionLabels(m map[sku.LabelName]sku.LabelValue, os, selfHosted sku.LabelValue) map[sku.LabelName]sku.LabelValue {
out := make(map[sku.LabelName]sku.LabelValue, len(m)+2)
for k, v := range m {
out[k] = v
}
out[sku.OS] = os
out[sku.SelfHosted] = selfHosted
return out
}
Loading
Loading