@@ -29,9 +29,7 @@ import (
2929 qpb "github.com/buildbuddy-io/buildbuddy/proto/quota"
3030)
3131
32- var (
33- quotaManagerEnabled = flag .Bool ("app.enable_quota_management" , false , "If set, quota management will be enabled" )
34- )
32+ var quotaManagerEnabled = flag .Bool ("app.enable_quota_management" , false , "If set, quota management will be enabled" )
3533
3634const (
3735 // The maximum number of attempts to update rate limit data in redis.
@@ -48,6 +46,9 @@ const (
4846 // when there is an update.
4947 pubSubChannelName = "quota-change-notifications"
5048
49+ // The names of the flagd experiments for quota management.
50+ bucketQuotaExperimentName = "quota.buckets"
51+
5152 namespaceSeperator = ":"
5253)
5354
@@ -84,6 +85,61 @@ func bucketToRow(namespace string, from *qpb.Bucket) *tables.QuotaBucket {
8485 return res
8586}
8687
88+ func bucketRowFromMap (namespace string , bucketMap map [string ]interface {}) (* tables.QuotaBucket , error ) {
89+ name , ok := bucketMap ["name" ].(string )
90+ if ! ok || name == "" {
91+ return nil , status .InvalidArgumentError ("bucket.name must be a non-empty string" )
92+ }
93+
94+ maxRateInterface , ok := bucketMap ["maxRate" ]
95+ if ! ok {
96+ return nil , status .InvalidArgumentError ("bucket.maxRate is required" )
97+ }
98+ maxRateMap , ok := maxRateInterface .(map [string ]interface {})
99+ if ! ok {
100+ return nil , status .InvalidArgumentError ("bucket.maxRate must be an object" )
101+ }
102+
103+ numRequests , err := interfaceToInt64 (maxRateMap ["numRequests" ])
104+ if err != nil || numRequests <= 0 {
105+ return nil , status .InvalidArgumentErrorf ("bucket.maxRate.numRequests must be a positive number: %s" , err )
106+ }
107+
108+ period , err := interfaceToInt64 (maxRateMap ["periodUsec" ])
109+ if err != nil {
110+ return nil , status .InvalidArgumentErrorf ("bucket.maxRate.periodUsec is invalid: %s" , err )
111+ }
112+ if period == 0 {
113+ return nil , status .InvalidArgumentError ("bucket.maxRate.periodUsec cannot be zero" )
114+ }
115+
116+ maxBurst , err := interfaceToInt64 (bucketMap ["maxBurst" ])
117+ if err != nil {
118+ return nil , status .InvalidArgumentErrorf ("bucket.maxBurst must be a number: %s" , err )
119+ }
120+
121+ return & tables.QuotaBucket {
122+ Namespace : namespace ,
123+ Name : name ,
124+ NumRequests : numRequests ,
125+ PeriodDurationUsec : period ,
126+ MaxBurst : maxBurst ,
127+ }, nil
128+ }
129+
130+ func interfaceToInt64 (v interface {}) (int64 , error ) {
131+ switch val := v .(type ) {
132+ case int64 :
133+ return val , nil
134+ case int :
135+ return int64 (val ), nil
136+ case float64 :
137+ return int64 (val ), nil
138+ default :
139+ return 0 , status .InvalidArgumentErrorf ("expected number type, got %T" , v )
140+ }
141+ }
142+
87143func namespaceConfigToProto (from * namespaceConfig ) * qpb.Namespace {
88144 res := & qpb.Namespace {
89145 Name : from .name ,
@@ -135,46 +191,45 @@ func fetchConfigFromDB(env environment.Env, namespace string) (map[string]*names
135191 namespace ,
136192 )
137193 }
138- err := db .ScanEach (
139- rq ,
140- func (ctx context.Context , qbg * struct {
141- tables.QuotaBucket
142- * tables.QuotaGroup
143- }) error {
144- qb := & qbg .QuotaBucket
145- ns , ok := config [qb .Namespace ]
146- if ! ok {
147- ns = & namespaceConfig {
148- name : qb .Namespace ,
149- assignedBuckets : make (map [string ]* assignedBucket ),
150- }
151- config [qb .Namespace ] = ns
152- }
153- bucket , ok := ns .assignedBuckets [qb .Name ]
154- if ! ok {
155- if err := validateBucket (bucketToProto (qb )); err != nil {
156- return status .InternalErrorf ("invalid bucket: %v" , qbg .QuotaBucket )
157- }
158- bucket = & assignedBucket {
159- bucket : & qbg .QuotaBucket ,
160- }
161- ns .assignedBuckets [qb .Name ] = bucket
162- }
163194
164- qg := qbg .QuotaGroup
165- if qg == nil {
166- // No Quota Group for this bucket
167- return nil
195+ type quotaBucketGroup struct {
196+ tables.QuotaBucket
197+ * tables.QuotaGroup
198+ }
199+
200+ if err := db .ScanEach (rq , func (ctx context.Context , qbg * quotaBucketGroup ) error {
201+ qb := & qbg .QuotaBucket
202+ ns , ok := config [qb .Namespace ]
203+ if ! ok {
204+ ns = & namespaceConfig {
205+ name : qb .Namespace ,
206+ assignedBuckets : make (map [string ]* assignedBucket ),
168207 }
169- if qg .BucketName == defaultBucketName {
170- log .Warningf ("Doesn't need to create QuotaGroup for default bucket in namespace %q" , qg .Namespace )
171- return nil
208+ config [qb .Namespace ] = ns
209+ }
210+ bucket , ok := ns .assignedBuckets [qb .Name ]
211+ if ! ok {
212+ if err := validateBucket (bucketToProto (qb )); err != nil {
213+ return status .InternalErrorf ("invalid bucket: %v" , qbg .QuotaBucket )
172214 }
173- bucket .quotaKeys = append (bucket .quotaKeys , qg .QuotaKey )
215+ bucket = & assignedBucket {
216+ bucket : & qbg .QuotaBucket ,
217+ }
218+ ns .assignedBuckets [qb .Name ] = bucket
219+ }
220+
221+ qg := qbg .QuotaGroup
222+ if qg == nil {
223+ // No Quota Group for this bucket
174224 return nil
175- },
176- )
177- if err != nil {
225+ }
226+ if qg .BucketName == defaultBucketName {
227+ log .Warningf ("Doesn't need to create QuotaGroup for default bucket in namespace %q" , qg .Namespace )
228+ return nil
229+ }
230+ bucket .quotaKeys = append (bucket .quotaKeys , qg .QuotaKey )
231+ return nil
232+ }); err != nil {
178233 return nil , status .InternalErrorf ("fetchConfigFromDB query failed: %s" , err )
179234 }
180235 return config , nil
@@ -319,6 +374,81 @@ func (qm *QuotaManager) createNamespace(env environment.Env, name string, config
319374 return ns , nil
320375}
321376
377+ // quota requirements can also be configured in flagd experiment config.
378+ //
379+ // Since flagd is configured by request context, we need to read and store the quota
380+ // config during the request.
381+ //
382+ // quota.buckets must have valid structure, for example:
383+ //
384+ // "rpc:/buildbuddy.service.BuildBuddyService/GetInvocation": {
385+ // "name": "custom",
386+ // "max_rate": {"num_requests": 100, "period": "1m"},
387+ // "max_burst": 10
388+ // }
389+ func (qm * QuotaManager ) loadQuotasFromFlagd (ctx context.Context , key , nsString string ) error {
390+ if qm .env .GetExperimentFlagProvider () == nil {
391+ return status .InternalError ("experiment flag provider not configured" )
392+ }
393+
394+ // If the bucket is already loaded, skip it.
395+ if ns , ok := qm .namespaces .Load (nsString ); ok {
396+ if _ , ok := ns .(* namespace ).bucketsByKey [key ]; ok {
397+ return nil
398+ }
399+ }
400+
401+ flagdBucketsConfig := qm .env .GetExperimentFlagProvider ().Object (ctx , bucketQuotaExperimentName , nil )
402+ if flagdBucketsConfig == nil {
403+ return nil
404+ }
405+
406+ // flagd will check the user/group for a "key", so we don't need to look it up again.
407+ keySpecificFlagdNamespaceConfig , ok := flagdBucketsConfig [nsString ]
408+ if ! ok {
409+ return nil
410+ }
411+ bucketMap , ok := keySpecificFlagdNamespaceConfig .(map [string ]interface {})
412+ if ! ok {
413+ return status .InvalidArgumentErrorf ("invalid quota.buckets config for namespace %q: expected object, got %T" , nsString , keySpecificFlagdNamespaceConfig )
414+ }
415+
416+ // Directly create bucket row from map to avoid expensive proto conversion on every request
417+ bucketRow , err := bucketRowFromMap (nsString , bucketMap )
418+ if err != nil {
419+ return status .InvalidArgumentErrorf ("failed to parse quota bucket config for namespace %q: %s" , nsString , err )
420+ }
421+
422+ // createNamespace will create the namespace if it doesn't exist, and merge the new bucket into the namespace
423+ ns , err := qm .createNamespace (qm .env , nsString , & namespaceConfig {
424+ name : nsString ,
425+ assignedBuckets : map [string ]* assignedBucket {key : {bucket : bucketRow , quotaKeys : []string {key }}},
426+ })
427+ if err != nil {
428+ return status .InternalErrorf ("failed to create namespace for namespace %q: %s" , nsString , err )
429+ }
430+ qm .mergeIntoNamespace (ns )
431+ return nil
432+ }
433+
434+ func (qm * QuotaManager ) mergeIntoNamespace (ns * namespace ) {
435+ existingNs , loaded := qm .namespaces .LoadOrStore (ns .name , ns )
436+ if ! loaded {
437+ return
438+ }
439+
440+ // keep existing config and bucketsByKey, only merge new buckets
441+ existingNs .(* namespace ).bucketsByKey = mergeMaps (ns .bucketsByKey , existingNs .(* namespace ).bucketsByKey )
442+ qm .namespaces .Store (ns .name , existingNs )
443+ }
444+
445+ func mergeMaps (to , from map [string ]Bucket ) map [string ]Bucket {
446+ for k , v := range from {
447+ to [k ] = v
448+ }
449+ return to
450+ }
451+
322452// findBucket finds the bucket given a namespace and key. If the key is found in
323453// bucketsByKey map, return the corresponding bucket. Otherwise, return the
324454// default bucket. Returns nil if the namespace is not found or the default bucket
@@ -339,13 +469,21 @@ func (qm *QuotaManager) findBucket(nsName string, key string) Bucket {
339469
340470func (qm * QuotaManager ) Allow (ctx context.Context , namespace string , quantity int64 ) error {
341471 key , err := quota .GetKey (ctx , qm .env )
342-
343472 if err != nil {
344473 metrics .QuotaKeyEmptyCount .With (prometheus.Labels {
345474 metrics .QuotaNamespace : namespace ,
346475 }).Inc ()
347476 return nil
348477 }
478+
479+ // Flagd experiments are configured from the request's context, so we load
480+ // these into the manager on the first key+namespace pair. Once these are loaded,
481+ // we'll skip loading them again.
482+ if err := qm .loadQuotasFromFlagd (ctx , key , namespace ); err != nil {
483+ log .CtxWarningf (ctx , "Failed to load quotas from flagd for %q: %s" , namespace , err )
484+ return err
485+ }
486+
349487 b := qm .findBucket (namespace , key )
350488 if b == nil {
351489 // The bucket is not found, b/c either the namespace or the default bucket
@@ -366,7 +504,7 @@ func (qm *QuotaManager) Allow(ctx context.Context, namespace string, quantity in
366504 metrics .QuotaNamespace : namespace ,
367505 metrics .QuotaKey : key ,
368506 }).Inc ()
369- return status .ResourceExhaustedErrorf ("quota exceeded for %q " , namespace )
507+ return status .ResourceExhaustedErrorf ("quota of namespace: %v %d requests in %v time period for key: %v exceeded " , namespace , b . Config (). NumRequests , b . Config (). PeriodDurationUsec , key )
370508 }
371509}
372510
@@ -570,7 +708,6 @@ func (qm *QuotaManager) reloadNamespaces() error {
570708 ns , err := qm .createNamespace (qm .env , nsName , nsConfig )
571709 if err != nil {
572710 return err
573-
574711 }
575712 qm .namespaces .Store (nsName , ns )
576713 }
0 commit comments