@@ -29,9 +29,7 @@ import (
2929 qpb "github.com/buildbuddy-io/buildbuddy/proto/quota"
3030)
3131
32- var (
33- quotaManagerEnabled = flag .Bool ("app.enable_quota_management" , false , "If set, quota management will be enabled" )
34- )
32+ var quotaManagerEnabled = flag .Bool ("app.enable_quota_management" , false , "If set, quota management will be enabled" )
3533
3634const (
3735 // The maximum number of attempts to update rate limit data in redis.
@@ -48,6 +46,10 @@ const (
4846 // when there is an update.
4947 pubSubChannelName = "quota-change-notifications"
5048
49+ // The names of the flagd experiments for quota management.
50+ blockedQuotaExperimentName = "quota.blocked"
51+ bucketQuotaExperimentName = "quota.buckets"
52+
5153 namespaceSeperator = ":"
5254)
5355
@@ -84,6 +86,61 @@ func bucketToRow(namespace string, from *qpb.Bucket) *tables.QuotaBucket {
8486 return res
8587}
8688
89+ func bucketRowFromMap (namespace string , bucketMap map [string ]interface {}) (* tables.QuotaBucket , error ) {
90+ name , ok := bucketMap ["name" ].(string )
91+ if ! ok || name == "" {
92+ return nil , status .InvalidArgumentError ("bucket.name must be a non-empty string" )
93+ }
94+
95+ maxRateInterface , ok := bucketMap ["max_rate" ]
96+ if ! ok {
97+ return nil , status .InvalidArgumentError ("bucket.max_rate is required" )
98+ }
99+ maxRateMap , ok := maxRateInterface .(map [string ]interface {})
100+ if ! ok {
101+ return nil , status .InvalidArgumentError ("bucket.max_rate must be an object" )
102+ }
103+
104+ numRequests , err := interfaceToInt64 (maxRateMap ["num_requests" ])
105+ if err != nil || numRequests <= 0 {
106+ return nil , status .InvalidArgumentErrorf ("bucket.max_rate.num_requests must be a positive number: %s" , err )
107+ }
108+
109+ period , err := interfaceToInt64 (maxRateMap ["period_usec" ])
110+ if err != nil {
111+ return nil , status .InvalidArgumentErrorf ("bucket.max_rate.period_usec is invalid: %s" , err )
112+ }
113+ if period == 0 {
114+ return nil , status .InvalidArgumentError ("bucket.max_rate.period cannot be zero" )
115+ }
116+
117+ maxBurst , err := interfaceToInt64 (bucketMap ["max_burst" ])
118+ if err != nil {
119+ return nil , status .InvalidArgumentErrorf ("bucket.max_burst must be a number: %s" , err )
120+ }
121+
122+ return & tables.QuotaBucket {
123+ Namespace : namespace ,
124+ Name : name ,
125+ NumRequests : numRequests ,
126+ PeriodDurationUsec : period ,
127+ MaxBurst : maxBurst ,
128+ }, nil
129+ }
130+
131+ func interfaceToInt64 (v interface {}) (int64 , error ) {
132+ switch val := v .(type ) {
133+ case int64 :
134+ return val , nil
135+ case int :
136+ return int64 (val ), nil
137+ case float64 :
138+ return int64 (val ), nil
139+ default :
140+ return 0 , status .InvalidArgumentErrorf ("expected number type, got %T" , v )
141+ }
142+ }
143+
87144func namespaceConfigToProto (from * namespaceConfig ) * qpb.Namespace {
88145 res := & qpb.Namespace {
89146 Name : from .name ,
@@ -140,7 +197,8 @@ func fetchConfigFromDB(env environment.Env, namespace string) (map[string]*names
140197 func (ctx context.Context , qbg * struct {
141198 tables.QuotaBucket
142199 * tables.QuotaGroup
143- }) error {
200+ },
201+ ) error {
144202 qb := & qbg .QuotaBucket
145203 ns , ok := config [qb .Namespace ]
146204 if ! ok {
@@ -200,8 +258,6 @@ func validateBucket(bucket *qpb.Bucket) error {
200258}
201259
202260type Bucket interface {
203- // Config returns a copy of the QuotaBucket. Used for testing.
204- Config () tables.QuotaBucket
205261 Allow (ctx context.Context , key string , quantity int64 ) (bool , error )
206262}
207263
@@ -319,6 +375,65 @@ func (qm *QuotaManager) createNamespace(env environment.Env, name string, config
319375 return ns , nil
320376}
321377
378+ // blockedBucket always denies requests. Used for blocking/banning users.
379+ type blockedBucket struct {}
380+
381+ func (b * blockedBucket ) Allow (ctx context.Context , key string , quantity int64 ) (bool , error ) {
382+ return false , nil
383+ }
384+
385+ // quota requirements can also be configured in flagd experiment config. user/groups
386+ // can either be blocked completely, or a bucket can be configured similar to MySQL.
387+ //
388+ // quota.buckets must have valid structure, for example:
389+ //
390+ // "rpc:/buildbuddy.service.BuildBuddyService/GetInvocation": {
391+ // "name": "custom",
392+ // "max_rate": {"num_requests": 100, "period": "1m"},
393+ // "max_burst": 10
394+ // }
395+ func (qm * QuotaManager ) readFlagdQuota (ctx context.Context , namespace string ) Bucket {
396+ if qm .env .GetExperimentFlagProvider () == nil {
397+ return nil
398+ }
399+
400+ if qm .env .GetExperimentFlagProvider ().Boolean (ctx , blockedQuotaExperimentName , false ) {
401+ return & blockedBucket {}
402+ }
403+
404+ bucketsConfig := qm .env .GetExperimentFlagProvider ().Object (ctx , bucketQuotaExperimentName , nil )
405+ if bucketsConfig == nil {
406+ return nil
407+ }
408+ namespaceConfig , ok := bucketsConfig [namespace ]
409+ if ! ok {
410+ return nil
411+ }
412+ bucketMap , ok := namespaceConfig .(map [string ]interface {})
413+ if ! ok {
414+ log .CtxWarningf (ctx , "Invalid quota.buckets config for namespace %q: expected object, got %T" , namespace , namespaceConfig )
415+ return nil
416+ }
417+
418+ // Directly create bucket row from map to avoid expensive proto conversion on every request
419+ bucketRow , err := bucketRowFromMap (namespace , bucketMap )
420+ if err != nil {
421+ log .CtxWarningf (ctx , "Failed to parse quota bucket config for namespace %q: %s" , namespace , err )
422+ return nil
423+ }
424+
425+ // TODO: cache created buckets to avoid recreating on every request. But this will only
426+ // happen if the group matches and the namespace matches, so we should only create
427+ // the bucket when we need to limit it.
428+ bucket , err := qm .bucketCreator (qm .env , bucketRow )
429+ if err != nil {
430+ log .CtxWarningf (ctx , "Failed to create quota bucket for namespace %q: %s" , namespace , err )
431+ return nil
432+ }
433+
434+ return bucket
435+ }
436+
322437// findBucket finds the bucket given a namespace and key. If the key is found in
323438// bucketsByKey map, return the corresponding bucket. Otherwise, return the
324439// default bucket. Returns nil if the namespace is not found or the default bucket
@@ -339,13 +454,31 @@ func (qm *QuotaManager) findBucket(nsName string, key string) Bucket {
339454
340455func (qm * QuotaManager ) Allow (ctx context.Context , namespace string , quantity int64 ) error {
341456 key , err := quota .GetKey (ctx , qm .env )
342-
343457 if err != nil {
344458 metrics .QuotaKeyEmptyCount .With (prometheus.Labels {
345459 metrics .QuotaNamespace : namespace ,
346460 }).Inc ()
347461 return nil
348462 }
463+
464+ flagdBucket := qm .readFlagdQuota (ctx , namespace )
465+ if flagdBucket != nil {
466+ allow , err := flagdBucket .Allow (ctx , key , quantity )
467+ if err != nil {
468+ log .CtxWarningf (ctx , "Flagd quota check for %q failed: %s" , namespace , err )
469+ // There is some error when determining whether the request should be
470+ // allowed. Do not block the traffic when the quota system has issues.
471+ } else if ! allow {
472+ metrics .QuotaExceeded .With (prometheus.Labels {
473+ metrics .QuotaNamespace : namespace ,
474+ metrics .QuotaKey : key ,
475+ metrics .QuotaSource : "flagd" ,
476+ }).Inc ()
477+ return status .ResourceExhaustedErrorf ("quota exceeded for %q" , namespace )
478+ }
479+ }
480+
481+ // Also check DB based quota check.
349482 b := qm .findBucket (namespace , key )
350483 if b == nil {
351484 // The bucket is not found, b/c either the namespace or the default bucket
@@ -354,7 +487,7 @@ func (qm *QuotaManager) Allow(ctx context.Context, namespace string, quantity in
354487 }
355488 allow , err := b .Allow (ctx , key , quantity )
356489 if err != nil {
357- log .CtxWarningf (ctx , "Quota check for %q failed: %s" , namespace , err )
490+ log .CtxWarningf (ctx , "MySQL quota check for %q failed: %s" , namespace , err )
358491 // There is some error when determining whether the request should be
359492 // allowed. Do not block the traffic when the quota system has issues.
360493 return nil
@@ -365,6 +498,7 @@ func (qm *QuotaManager) Allow(ctx context.Context, namespace string, quantity in
365498 metrics .QuotaExceeded .With (prometheus.Labels {
366499 metrics .QuotaNamespace : namespace ,
367500 metrics .QuotaKey : key ,
501+ metrics .QuotaSource : "mysql" ,
368502 }).Inc ()
369503 return status .ResourceExhaustedErrorf ("quota exceeded for %q" , namespace )
370504 }
@@ -570,7 +704,6 @@ func (qm *QuotaManager) reloadNamespaces() error {
570704 ns , err := qm .createNamespace (qm .env , nsName , nsConfig )
571705 if err != nil {
572706 return err
573-
574707 }
575708 qm .namespaces .Store (nsName , ns )
576709 }
0 commit comments