@@ -29,9 +29,7 @@ import (
2929 qpb "github.com/buildbuddy-io/buildbuddy/proto/quota"
3030)
3131
32- var (
33- quotaManagerEnabled = flag .Bool ("app.enable_quota_management" , false , "If set, quota management will be enabled" )
34- )
32+ var quotaManagerEnabled = flag .Bool ("app.enable_quota_management" , false , "If set, quota management will be enabled" )
3533
3634const (
3735 // The maximum number of attempts to update rate limit data in redis.
@@ -48,6 +46,10 @@ const (
4846 // when there is an update.
4947 pubSubChannelName = "quota-change-notifications"
5048
49+ // The names of the flagd experiments for quota management.
50+ blockedQuotaExperimentName = "quota.blocked"
51+ bucketQuotaExperimentName = "quota.buckets"
52+
5153 namespaceSeperator = ":"
5254)
5355
@@ -84,6 +86,61 @@ func bucketToRow(namespace string, from *qpb.Bucket) *tables.QuotaBucket {
8486 return res
8587}
8688
89+ func bucketRowFromMap (namespace string , bucketMap map [string ]interface {}) (* tables.QuotaBucket , error ) {
90+ name , ok := bucketMap ["name" ].(string )
91+ if ! ok || name == "" {
92+ return nil , status .InvalidArgumentError ("bucket.name must be a non-empty string" )
93+ }
94+
95+ maxRateInterface , ok := bucketMap ["max_rate" ]
96+ if ! ok {
97+ return nil , status .InvalidArgumentError ("bucket.max_rate is required" )
98+ }
99+ maxRateMap , ok := maxRateInterface .(map [string ]interface {})
100+ if ! ok {
101+ return nil , status .InvalidArgumentError ("bucket.max_rate must be an object" )
102+ }
103+
104+ numRequests , err := interfaceToInt64 (maxRateMap ["num_requests" ])
105+ if err != nil || numRequests <= 0 {
106+ return nil , status .InvalidArgumentErrorf ("bucket.max_rate.num_requests must be a positive number: %s" , err )
107+ }
108+
109+ period , err := interfaceToInt64 (maxRateMap ["period_usec" ])
110+ if err != nil {
111+ return nil , status .InvalidArgumentErrorf ("bucket.max_rate.period_usec is invalid: %s" , err )
112+ }
113+ if period == 0 {
114+ return nil , status .InvalidArgumentError ("bucket.max_rate.period cannot be zero" )
115+ }
116+
117+ maxBurst , err := interfaceToInt64 (bucketMap ["max_burst" ])
118+ if err != nil {
119+ return nil , status .InvalidArgumentErrorf ("bucket.max_burst must be a number: %s" , err )
120+ }
121+
122+ return & tables.QuotaBucket {
123+ Namespace : namespace ,
124+ Name : name ,
125+ NumRequests : numRequests ,
126+ PeriodDurationUsec : period ,
127+ MaxBurst : maxBurst ,
128+ }, nil
129+ }
130+
131+ func interfaceToInt64 (v interface {}) (int64 , error ) {
132+ switch val := v .(type ) {
133+ case int64 :
134+ return val , nil
135+ case int :
136+ return int64 (val ), nil
137+ case float64 :
138+ return int64 (val ), nil
139+ default :
140+ return 0 , status .InvalidArgumentErrorf ("expected number type, got %T" , v )
141+ }
142+ }
143+
87144func namespaceConfigToProto (from * namespaceConfig ) * qpb.Namespace {
88145 res := & qpb.Namespace {
89146 Name : from .name ,
@@ -135,46 +192,45 @@ func fetchConfigFromDB(env environment.Env, namespace string) (map[string]*names
135192 namespace ,
136193 )
137194 }
138- err := db .ScanEach (
139- rq ,
140- func (ctx context.Context , qbg * struct {
141- tables.QuotaBucket
142- * tables.QuotaGroup
143- }) error {
144- qb := & qbg .QuotaBucket
145- ns , ok := config [qb .Namespace ]
146- if ! ok {
147- ns = & namespaceConfig {
148- name : qb .Namespace ,
149- assignedBuckets : make (map [string ]* assignedBucket ),
150- }
151- config [qb .Namespace ] = ns
152- }
153- bucket , ok := ns .assignedBuckets [qb .Name ]
154- if ! ok {
155- if err := validateBucket (bucketToProto (qb )); err != nil {
156- return status .InternalErrorf ("invalid bucket: %v" , qbg .QuotaBucket )
157- }
158- bucket = & assignedBucket {
159- bucket : & qbg .QuotaBucket ,
160- }
161- ns .assignedBuckets [qb .Name ] = bucket
162- }
163195
164- qg := qbg .QuotaGroup
165- if qg == nil {
166- // No Quota Group for this bucket
167- return nil
196+ type quotaBucketGroup struct {
197+ tables.QuotaBucket
198+ * tables.QuotaGroup
199+ }
200+
201+ if err := db .ScanEach (rq , func (ctx context.Context , qbg * quotaBucketGroup ) error {
202+ qb := & qbg .QuotaBucket
203+ ns , ok := config [qb .Namespace ]
204+ if ! ok {
205+ ns = & namespaceConfig {
206+ name : qb .Namespace ,
207+ assignedBuckets : make (map [string ]* assignedBucket ),
168208 }
169- if qg .BucketName == defaultBucketName {
170- log .Warningf ("Doesn't need to create QuotaGroup for default bucket in namespace %q" , qg .Namespace )
171- return nil
209+ config [qb .Namespace ] = ns
210+ }
211+ bucket , ok := ns .assignedBuckets [qb .Name ]
212+ if ! ok {
213+ if err := validateBucket (bucketToProto (qb )); err != nil {
214+ return status .InternalErrorf ("invalid bucket: %v" , qbg .QuotaBucket )
172215 }
173- bucket .quotaKeys = append (bucket .quotaKeys , qg .QuotaKey )
216+ bucket = & assignedBucket {
217+ bucket : & qbg .QuotaBucket ,
218+ }
219+ ns .assignedBuckets [qb .Name ] = bucket
220+ }
221+
222+ qg := qbg .QuotaGroup
223+ if qg == nil {
224+ // No Quota Group for this bucket
174225 return nil
175- },
176- )
177- if err != nil {
226+ }
227+ if qg .BucketName == defaultBucketName {
228+ log .Warningf ("Doesn't need to create QuotaGroup for default bucket in namespace %q" , qg .Namespace )
229+ return nil
230+ }
231+ bucket .quotaKeys = append (bucket .quotaKeys , qg .QuotaKey )
232+ return nil
233+ }); err != nil {
178234 return nil , status .InternalErrorf ("fetchConfigFromDB query failed: %s" , err )
179235 }
180236 return config , nil
@@ -200,8 +256,6 @@ func validateBucket(bucket *qpb.Bucket) error {
200256}
201257
202258type Bucket interface {
203- // Config returns a copy of the QuotaBucket. Used for testing.
204- Config () tables.QuotaBucket
205259 Allow (ctx context.Context , key string , quantity int64 ) (bool , error )
206260}
207261
@@ -319,6 +373,65 @@ func (qm *QuotaManager) createNamespace(env environment.Env, name string, config
319373 return ns , nil
320374}
321375
376+ // blockedBucket always denies requests. Used for blocking/banning users.
377+ type blockedBucket struct {}
378+
379+ func (b * blockedBucket ) Allow (ctx context.Context , key string , quantity int64 ) (bool , error ) {
380+ return false , nil
381+ }
382+
383+ // quota requirements can also be configured in flagd experiment config. user/groups
384+ // can either be blocked completely, or a bucket can be configured similar to MySQL.
385+ //
386+ // quota.buckets must have valid structure, for example:
387+ //
388+ // "rpc:/buildbuddy.service.BuildBuddyService/GetInvocation": {
389+ // "name": "custom",
390+ // "max_rate": {"num_requests": 100, "period": "1m"},
391+ // "max_burst": 10
392+ // }
393+ func (qm * QuotaManager ) readFlagdQuota (ctx context.Context , namespace string ) Bucket {
394+ if qm .env .GetExperimentFlagProvider () == nil {
395+ return nil
396+ }
397+
398+ if qm .env .GetExperimentFlagProvider ().Boolean (ctx , blockedQuotaExperimentName , false ) {
399+ return & blockedBucket {}
400+ }
401+
402+ bucketsConfig := qm .env .GetExperimentFlagProvider ().Object (ctx , bucketQuotaExperimentName , nil )
403+ if bucketsConfig == nil {
404+ return nil
405+ }
406+ namespaceConfig , ok := bucketsConfig [namespace ]
407+ if ! ok {
408+ return nil
409+ }
410+ bucketMap , ok := namespaceConfig .(map [string ]interface {})
411+ if ! ok {
412+ log .CtxWarningf (ctx , "Invalid quota.buckets config for namespace %q: expected object, got %T" , namespace , namespaceConfig )
413+ return nil
414+ }
415+
416+ // Directly create bucket row from map to avoid expensive proto conversion on every request
417+ bucketRow , err := bucketRowFromMap (namespace , bucketMap )
418+ if err != nil {
419+ log .CtxWarningf (ctx , "Failed to parse quota bucket config for namespace %q: %s" , namespace , err )
420+ return nil
421+ }
422+
423+ // TODO: cache created buckets to avoid recreating on every request. But this will only
424+ // happen if the group matches and the namespace matches, so we should only create
425+ // the bucket when we need to limit it.
426+ bucket , err := qm .bucketCreator (qm .env , bucketRow )
427+ if err != nil {
428+ log .CtxWarningf (ctx , "Failed to create quota bucket for namespace %q: %s" , namespace , err )
429+ return nil
430+ }
431+
432+ return bucket
433+ }
434+
322435// findBucket finds the bucket given a namespace and key. If the key is found in
323436// bucketsByKey map, return the corresponding bucket. Otherwise, return the
324437// default bucket. Returns nil if the namespace is not found or the default bucket
@@ -339,13 +452,31 @@ func (qm *QuotaManager) findBucket(nsName string, key string) Bucket {
339452
340453func (qm * QuotaManager ) Allow (ctx context.Context , namespace string , quantity int64 ) error {
341454 key , err := quota .GetKey (ctx , qm .env )
342-
343455 if err != nil {
344456 metrics .QuotaKeyEmptyCount .With (prometheus.Labels {
345457 metrics .QuotaNamespace : namespace ,
346458 }).Inc ()
347459 return nil
348460 }
461+
462+ flagdBucket := qm .readFlagdQuota (ctx , namespace )
463+ if flagdBucket != nil {
464+ allow , err := flagdBucket .Allow (ctx , key , quantity )
465+ if err != nil {
466+ log .CtxWarningf (ctx , "Flagd quota check for %q failed: %s" , namespace , err )
467+ // There is some error when determining whether the request should be
468+ // allowed. Do not block the traffic when the quota system has issues.
469+ } else if ! allow {
470+ metrics .QuotaExceeded .With (prometheus.Labels {
471+ metrics .QuotaNamespace : namespace ,
472+ metrics .QuotaKey : key ,
473+ metrics .QuotaSource : "flagd" ,
474+ }).Inc ()
475+ return status .ResourceExhaustedErrorf ("quota exceeded for %q" , namespace )
476+ }
477+ }
478+
479+ // Also check DB based quota check.
349480 b := qm .findBucket (namespace , key )
350481 if b == nil {
351482 // The bucket is not found, b/c either the namespace or the default bucket
@@ -354,7 +485,7 @@ func (qm *QuotaManager) Allow(ctx context.Context, namespace string, quantity in
354485 }
355486 allow , err := b .Allow (ctx , key , quantity )
356487 if err != nil {
357- log .CtxWarningf (ctx , "Quota check for %q failed: %s" , namespace , err )
488+ log .CtxWarningf (ctx , "MySQL quota check for %q failed: %s" , namespace , err )
358489 // There is some error when determining whether the request should be
359490 // allowed. Do not block the traffic when the quota system has issues.
360491 return nil
@@ -365,6 +496,7 @@ func (qm *QuotaManager) Allow(ctx context.Context, namespace string, quantity in
365496 metrics .QuotaExceeded .With (prometheus.Labels {
366497 metrics .QuotaNamespace : namespace ,
367498 metrics .QuotaKey : key ,
499+ metrics .QuotaSource : "mysql" ,
368500 }).Inc ()
369501 return status .ResourceExhaustedErrorf ("quota exceeded for %q" , namespace )
370502 }
@@ -570,7 +702,6 @@ func (qm *QuotaManager) reloadNamespaces() error {
570702 ns , err := qm .createNamespace (qm .env , nsName , nsConfig )
571703 if err != nil {
572704 return err
573-
574705 }
575706 qm .namespaces .Store (nsName , ns )
576707 }
0 commit comments