@@ -32,7 +32,7 @@ import (
3232
3333var (
3434 region = flag .String ("app.region" , "" , "The region in which the app is running." )
35- writeToOLAP = flag .Bool ("app.write_usage_to_olap_db" , false , "If true, write usage data to OLAP DB." )
35+ writeToOLAP = flag .Bool ("app.write_usage_to_olap_db" , false , "If true, write usage data to OLAP DB in addition to the primary DB write ." )
3636)
3737
3838const (
@@ -307,6 +307,8 @@ func (ut *tracker) flushToDB(ctx context.Context) error {
307307 ctx , cancel = context .WithDeadline (ctx , deadline .Add (- 5 * time .Second ))
308308 defer cancel ()
309309
310+ var olapRows []* olaptables.RawUsage
311+
310312 // Loop through usage periods starting from the oldest period
311313 // that may exist in Redis (based on key expiration time) and looping up until
312314 // we hit a period which is not yet "settled".
@@ -359,12 +361,19 @@ func (ut *tracker) flushToDB(ctx context.Context) error {
359361 return err
360362 }
361363 // Update counts in the DB
362- // TODO: for ClickHouse, we should flush everything in a single
363- // query, so that we don't have to rely on async_insert being
364- // enabled in order to properly batch the inserts.
365- if err := ut .flushCounts (ctx , collection .GroupID , p , collection .UsageLabels (), counts ); err != nil {
364+ groupID := collection .GroupID
365+ labels := collection .UsageLabels ()
366+ if err := ut .flushCountsToPrimaryDB (ctx , groupID , p , labels , counts ); err != nil {
366367 return err
367368 }
369+ // If OLAP writes are enabled, accumulate OLAP rows.
370+ if * writeToOLAP {
371+ olapRows = append (
372+ olapRows ,
373+ toOLAPRows (groupID , p , labels , counts )... ,
374+ )
375+ }
376+
368377 // Remove the collection data from Redis now that it has been
369378 // flushed to the DB.
370379 pipe := ut .rdb .TxPipeline ()
@@ -375,115 +384,14 @@ func (ut *tracker) flushToDB(ctx context.Context) error {
375384 }
376385 }
377386 }
378- return nil
379- }
380387
381- func (ut * tracker ) flushCounts (ctx context.Context , groupID string , p period , labels * tables.UsageLabels , counts * tables.UsageCounts ) error {
382- if err := ut .flushCountsToPrimaryDB (ctx , groupID , p , labels , counts ); err != nil {
383- return err
384- }
385- if * writeToOLAP {
386- if err := ut .flushCountsToOLAPDB (ctx , groupID , p , labels , counts ); err != nil {
387- return err
388+ // Flush a accumulated OLAP rows.
389+ if len (olapRows ) > 0 {
390+ if err := ut .env .GetOLAPDBHandle ().FlushUsages (ctx , olapRows ); err != nil {
391+ return status .WrapError (err , "flush OLAP usage records" )
388392 }
389393 }
390- return nil
391- }
392394
393- func (ut * tracker ) flushCountsToOLAPDB (ctx context.Context , groupID string , p period , labels * tables.UsageLabels , counts * tables.UsageCounts ) error {
394- var rows []* olaptables.RawUsage
395- baseRow := olaptables.RawUsage {
396- GroupID : groupID ,
397- Labels : toLabelMap (labels ),
398- PeriodStart : p .Start (),
399- BufferID : ut .bufferID ,
400- }
401- if counts .ActionCacheHits > 0 {
402- row := baseRow // copy
403- row .SKU = sku .RemoteCacheACHits
404- row .Count = counts .ActionCacheHits
405- rows = append (rows , & row )
406- }
407- if counts .CASCacheHits > 0 {
408- row := baseRow // copy
409- row .SKU = sku .RemoteCacheCASHits
410- row .Count = counts .CASCacheHits
411- rows = append (rows , & row )
412- }
413- if counts .Invocations > 0 {
414- row := baseRow // copy
415- row .SKU = sku .BuildEventsBESCount
416- row .Count = counts .Invocations
417- rows = append (rows , & row )
418- }
419- if counts .LinuxExecutionDurationUsec > 0 {
420- row := baseRow // copy
421- row .SKU = sku .RemoteExecutionExecuteWorkerDurationNanos
422- row .Count = counts .LinuxExecutionDurationUsec
423- row .Labels = appendExecutionLabels (row .Labels , sku .OSLinux , sku .SelfHostedFalse )
424- rows = append (rows , & row )
425- }
426- if counts .MacExecutionDurationUsec > 0 {
427- row := baseRow // copy
428- row .SKU = sku .RemoteExecutionExecuteWorkerDurationNanos
429- row .Count = counts .MacExecutionDurationUsec
430- row .Labels = appendExecutionLabels (row .Labels , sku .OSMac , sku .SelfHostedFalse )
431- rows = append (rows , & row )
432- }
433- if counts .SelfHostedLinuxExecutionDurationUsec > 0 {
434- row := baseRow // copy
435- row .SKU = sku .RemoteExecutionExecuteWorkerDurationNanos
436- row .Count = counts .SelfHostedLinuxExecutionDurationUsec
437- row .Labels = appendExecutionLabels (row .Labels , sku .OSLinux , sku .SelfHostedTrue )
438- rows = append (rows , & row )
439- }
440- if counts .SelfHostedMacExecutionDurationUsec > 0 {
441- row := baseRow // copy
442- row .SKU = sku .RemoteExecutionExecuteWorkerDurationNanos
443- row .Count = counts .SelfHostedMacExecutionDurationUsec
444- row .Labels = appendExecutionLabels (row .Labels , sku .OSMac , sku .SelfHostedTrue )
445- rows = append (rows , & row )
446- }
447- if counts .TotalDownloadSizeBytes > 0 {
448- row := baseRow // copy
449- row .SKU = sku .RemoteCacheCASDownloadedBytes
450- row .Count = counts .TotalDownloadSizeBytes
451- rows = append (rows , & row )
452- }
453- if counts .TotalUploadSizeBytes > 0 {
454- row := baseRow // copy
455- row .SKU = sku .RemoteCacheCASUploadedBytes
456- row .Count = counts .TotalUploadSizeBytes
457- rows = append (rows , & row )
458- }
459- if counts .TotalCachedActionExecUsec > 0 {
460- row := baseRow // copy
461- row .SKU = sku .RemoteExecutionExecuteWorkerDurationNanos
462- row .Count = counts .TotalCachedActionExecUsec
463- rows = append (rows , & row )
464- }
465- if counts .CPUNanos > 0 {
466- row := baseRow // copy
467- row .SKU = sku .RemoteExecutionExecuteWorkerCPUNanos
468- row .Count = counts .CPUNanos
469- // NOTE: we currently only report CPU usage for cloud Linux executors.
470- row .Labels = appendExecutionLabels (row .Labels , sku .OSLinux , sku .SelfHostedFalse )
471- rows = append (rows , & row )
472- }
473- if counts .MemoryGBUsec > 0 {
474- row := baseRow // copy
475- row .SKU = sku .RemoteExecutionExecuteWorkerMemoryGBNanos
476- row .Count = counts .MemoryGBUsec
477- // NOTE: we currently only report memory usage for cloud Linux executors.
478- row .Labels = appendExecutionLabels (row .Labels , sku .OSLinux , sku .SelfHostedFalse )
479- rows = append (rows , & row )
480- }
481- if len (rows ) == 0 {
482- return nil
483- }
484- if err := ut .env .GetOLAPDBHandle ().FlushUsages (ctx , rows ); err != nil {
485- return fmt .Errorf ("insert OLAP usage records: %w" , err )
486- }
487395 return nil
488396}
489397
@@ -737,7 +645,102 @@ func toLabelMap(labels *tables.UsageLabels) map[sku.LabelName]sku.LabelValue {
737645 return m
738646}
739647
740- // appendEntry creates a copy of the given map and appends the given entry.
648+ // toOLAPRows converts primary DB usage rows to OLAP rows.
649+ // TODO(bduffany): once we've fully turned on OLAP-based usage, we can delete
650+ // this and use the OLAP schema directly.
651+ func toOLAPRows (bufferID , groupID string , p period , labels * tables.UsageLabels , counts * tables.UsageCounts ) []* olaptables.RawUsage {
652+ var rows []* olaptables.RawUsage
653+ baseRow := olaptables.RawUsage {
654+ GroupID : groupID ,
655+ Labels : toLabelMap (labels ),
656+ PeriodStart : p .Start (),
657+ BufferID : bufferID ,
658+ }
659+ if counts .ActionCacheHits > 0 {
660+ row := baseRow // copy
661+ row .SKU = sku .RemoteCacheACHits
662+ row .Count = counts .ActionCacheHits
663+ rows = append (rows , & row )
664+ }
665+ if counts .CASCacheHits > 0 {
666+ row := baseRow // copy
667+ row .SKU = sku .RemoteCacheCASHits
668+ row .Count = counts .CASCacheHits
669+ rows = append (rows , & row )
670+ }
671+ if counts .Invocations > 0 {
672+ row := baseRow // copy
673+ row .SKU = sku .BuildEventsBESCount
674+ row .Count = counts .Invocations
675+ rows = append (rows , & row )
676+ }
677+ if counts .LinuxExecutionDurationUsec > 0 {
678+ row := baseRow // copy
679+ row .SKU = sku .RemoteExecutionExecuteWorkerDurationNanos
680+ row .Count = counts .LinuxExecutionDurationUsec
681+ row .Labels = appendExecutionLabels (row .Labels , sku .OSLinux , sku .SelfHostedFalse )
682+ rows = append (rows , & row )
683+ }
684+ if counts .MacExecutionDurationUsec > 0 {
685+ row := baseRow // copy
686+ row .SKU = sku .RemoteExecutionExecuteWorkerDurationNanos
687+ row .Count = counts .MacExecutionDurationUsec
688+ row .Labels = appendExecutionLabels (row .Labels , sku .OSMac , sku .SelfHostedFalse )
689+ rows = append (rows , & row )
690+ }
691+ if counts .SelfHostedLinuxExecutionDurationUsec > 0 {
692+ row := baseRow // copy
693+ row .SKU = sku .RemoteExecutionExecuteWorkerDurationNanos
694+ row .Count = counts .SelfHostedLinuxExecutionDurationUsec
695+ row .Labels = appendExecutionLabels (row .Labels , sku .OSLinux , sku .SelfHostedTrue )
696+ rows = append (rows , & row )
697+ }
698+ if counts .SelfHostedMacExecutionDurationUsec > 0 {
699+ row := baseRow // copy
700+ row .SKU = sku .RemoteExecutionExecuteWorkerDurationNanos
701+ row .Count = counts .SelfHostedMacExecutionDurationUsec
702+ row .Labels = appendExecutionLabels (row .Labels , sku .OSMac , sku .SelfHostedTrue )
703+ rows = append (rows , & row )
704+ }
705+ if counts .TotalDownloadSizeBytes > 0 {
706+ row := baseRow // copy
707+ row .SKU = sku .RemoteCacheCASDownloadedBytes
708+ row .Count = counts .TotalDownloadSizeBytes
709+ rows = append (rows , & row )
710+ }
711+ if counts .TotalUploadSizeBytes > 0 {
712+ row := baseRow // copy
713+ row .SKU = sku .RemoteCacheCASUploadedBytes
714+ row .Count = counts .TotalUploadSizeBytes
715+ rows = append (rows , & row )
716+ }
717+ if counts .TotalCachedActionExecUsec > 0 {
718+ row := baseRow // copy
719+ row .SKU = sku .RemoteExecutionExecuteWorkerDurationNanos
720+ row .Count = counts .TotalCachedActionExecUsec
721+ rows = append (rows , & row )
722+ }
723+ if counts .CPUNanos > 0 {
724+ row := baseRow // copy
725+ row .SKU = sku .RemoteExecutionExecuteWorkerCPUNanos
726+ row .Count = counts .CPUNanos
727+ // NOTE: we currently only report CPU usage for cloud Linux executors.
728+ row .Labels = appendExecutionLabels (row .Labels , sku .OSLinux , sku .SelfHostedFalse )
729+ rows = append (rows , & row )
730+ }
731+ if counts .MemoryGBUsec > 0 {
732+ row := baseRow // copy
733+ row .SKU = sku .RemoteExecutionExecuteWorkerMemoryGBNanos
734+ row .Count = counts .MemoryGBUsec
735+ // NOTE: we currently only report memory usage for cloud Linux executors.
736+ row .Labels = appendExecutionLabels (row .Labels , sku .OSLinux , sku .SelfHostedFalse )
737+ rows = append (rows , & row )
738+ }
739+ return rows
740+ }
741+
742+ // appendExecutionLabels returns a new map which is a clone of the given map
743+ // with the given OS and self-hosted labels applied.
741744func appendExecutionLabels (m map [sku.LabelName ]sku.LabelValue , os , selfHosted sku.LabelValue ) map [sku.LabelName ]sku.LabelValue {
742745 out := make (map [sku.LabelName ]sku.LabelValue , len (m )+ 2 )
743746 for k , v := range m {
0 commit comments