Skip to content

Commit 01f70df

Browse files
committed
add zookeeper discovery
Signed-off-by: dongjiang1989 <dongjiang1989@126.com>
1 parent 7816e92 commit 01f70df

File tree

4 files changed

+408
-1
lines changed

4 files changed

+408
-1
lines changed

Makefile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -281,14 +281,15 @@ install: runtime
281281

282282
$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/discovery
283283
$(ENV_INSTALL) apisix/discovery/*.lua $(ENV_INST_LUADIR)/apisix/discovery/
284-
$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/discovery/{consul,consul_kv,dns,eureka,nacos,kubernetes,tars}
284+
$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/discovery/{consul,consul_kv,dns,eureka,nacos,kubernetes,tars,zookeeper}
285285
$(ENV_INSTALL) apisix/discovery/consul/*.lua $(ENV_INST_LUADIR)/apisix/discovery/consul
286286
$(ENV_INSTALL) apisix/discovery/consul_kv/*.lua $(ENV_INST_LUADIR)/apisix/discovery/consul_kv
287287
$(ENV_INSTALL) apisix/discovery/dns/*.lua $(ENV_INST_LUADIR)/apisix/discovery/dns
288288
$(ENV_INSTALL) apisix/discovery/eureka/*.lua $(ENV_INST_LUADIR)/apisix/discovery/eureka
289289
$(ENV_INSTALL) apisix/discovery/kubernetes/*.lua $(ENV_INST_LUADIR)/apisix/discovery/kubernetes
290290
$(ENV_INSTALL) apisix/discovery/nacos/*.lua $(ENV_INST_LUADIR)/apisix/discovery/nacos
291291
$(ENV_INSTALL) apisix/discovery/tars/*.lua $(ENV_INST_LUADIR)/apisix/discovery/tars
292+
$(ENV_INSTALL) apisix/discovery/zookeeper/*.lua $(ENV_INST_LUADIR)/apisix/discovery/zookeeper
292293

293294
$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/http
294295
$(ENV_INSTALL) apisix/http/*.lua $(ENV_INST_LUADIR)/apisix/http/
Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
--
2+
-- Licensed to the Apache Software Foundation (ASF) under one or more
3+
-- contributor license agreements. See the NOTICE file distributed with
4+
-- this work for additional information regarding copyright ownership.
5+
-- The ASF licenses this file to You under the Apache License, Version 2.0
6+
-- (the "License"); you may not use this file except in compliance with
7+
-- the License. You may obtain a copy of the License at
8+
--
9+
-- http://www.apache.org/licenses/LICENSE-2.0
10+
--
11+
-- Unless required by applicable law or agreed to in writing, software
12+
-- distributed under the License is distributed on an "AS IS" BASIS,
13+
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
-- See the License for the specific language governing permissions and
15+
-- limitations under the License.
16+
--
17+
18+
local core = require("apisix.core")
19+
local utils = require("apisix.discovery.zookeeper.utils")
20+
local schema = require("apisix.discovery.zookeeper.schema")
21+
local ngx = ngx
22+
local ipairs = ipairs
23+
local log = core.log
24+
25+
local _M = {
26+
version = 0.1,
27+
priority = 1000,
28+
name = "zookeeper",
29+
schema = schema.schema,
30+
}
31+
32+
-- Global Configuration
33+
local local_conf
34+
-- Service Instance Cache(service_name -> {nodes, expire_time})
35+
local instance_cache = core.lrucache.new({
36+
ttl = 3600,
37+
count = 1024
38+
})
39+
40+
-- Timer Identifier
41+
local fetch_timer
42+
43+
-- The instance list of a single service from ZooKeeper
44+
local function fetch_service_instances(conf, service_name)
45+
-- 1. Init connect
46+
local client, err = utils.new_zk_client(conf)
47+
if not client then
48+
return nil, err
49+
end
50+
51+
-- 2. TODO: Create path
52+
local service_path = conf.root_path .. "/" .. service_name
53+
local ok, err = utils.create_zk_path(client, service_path)
54+
if not ok then
55+
utils.close_zk_client(client)
56+
return nil, err
57+
end
58+
59+
-- 3. All instance nodes under a service
60+
local children, err = client:get_children(service_path)
61+
if not children then
62+
utils.close_zk_client(client)
63+
if err == "not exists" then
64+
log.warn("service path not exists: ", service_path)
65+
return {}
66+
end
67+
log.error("get zk children failed: ", err)
68+
return nil, err
69+
end
70+
71+
-- 4. Parse the data of each instance node one by one
72+
local instances = {}
73+
for _, child in ipairs(children) do
74+
local instance_path = service_path .. "/" .. child
75+
local data, stat, err = client:get(instance_path)
76+
if not data then
77+
log.error("get instance data failed: ", instance_path, " stat:", stat, " err: ", err)
78+
goto continue
79+
end
80+
81+
-- Parse instance data
82+
local instance = utils.parse_instance_data(data)
83+
if instance then
84+
table.insert(instances, instance)
85+
end
86+
87+
::continue::
88+
end
89+
90+
-- 5. Close connects
91+
utils.close_zk_client(client)
92+
93+
log.debug("fetch service instances: ", service_name, " count: ", #instances)
94+
return instances
95+
end
96+
97+
-- Scheduled fetch of all service instances (full cache update))
98+
local function fetch_all_services()
99+
if not local_conf then
100+
log.warn("zookeeper discovery config not loaded")
101+
return
102+
end
103+
104+
-- 1. Init Zookeeper client
105+
local client, err = utils.new_zk_client(local_conf)
106+
if not client then
107+
log.error("init zk client failed: ", err)
108+
return
109+
end
110+
111+
-- 2. All instance nodes under a service
112+
local services, err = client:get_children(local_conf.root_path)
113+
if not services then
114+
utils.close_zk_client(client)
115+
log.error("get zk root children failed: ", err)
116+
return
117+
end
118+
119+
-- 3. Fetch the instances of each service and update the cache
120+
local now = ngx.time()
121+
for _, service in ipairs(services) do
122+
local instances, err = fetch_service_instances(local_conf, service)
123+
if instances then
124+
instance_cache:set(service, nil, {
125+
nodes = instances,
126+
expire_time = now + local_conf.cache_ttl
127+
})
128+
else
129+
log.error("fetch service instances failed: ", service, " err: ", err)
130+
end
131+
end
132+
133+
-- 4. Close connects
134+
utils.close_zk_client(client)
135+
end
136+
137+
function _M.nodes(service_name)
138+
if not service_name or service_name == "" then
139+
log.error("service name is empty")
140+
return nil
141+
end
142+
143+
-- 1. Get from cache
144+
local cache = instance_cache:get(service_name)
145+
local now = ngx.time()
146+
147+
-- 2. If the cache is missed or expired, actively pull (the data))
148+
if not cache or cache.expire_time < now then
149+
log.debug("cache miss or expired, fetch from zk: ", service_name)
150+
local instances, err = fetch_service_instances(local_conf, service_name)
151+
if not instances then
152+
log.error("fetch instances failed: ", service_name, " err: ", err)
153+
-- Fallback: Return the old cache (if available))
154+
if cache then
155+
return cache.nodes
156+
end
157+
return nil
158+
end
159+
160+
-- Update the cache
161+
cache = {
162+
nodes = instances,
163+
expire_time = now + local_conf.cache_ttl
164+
}
165+
instance_cache:set(service_name, nil, cache)
166+
end
167+
168+
return cache.nodes
169+
end
170+
171+
function _M.check_schema(conf)
172+
return schema.check(conf)
173+
end
174+
175+
function _M.init_worker()
176+
-- Load configuration
177+
local core_config = core.config.local_conf
178+
local_conf = core_config.discovery and core_config.discovery.zookeeper or {}
179+
local ok, err = schema.check(local_conf)
180+
if not ok then
181+
log.error("invalid zookeeper discovery config: ", err)
182+
return
183+
end
184+
185+
-- The default values
186+
local_conf.connect_string = local_conf.connect_string or "127.0.0.1:2181"
187+
local_conf.fetch_interval = local_conf.fetch_interval or 10
188+
local_conf.cache_ttl = local_conf.cache_ttl or 30
189+
190+
-- Start the timer
191+
if not fetch_timer then
192+
fetch_timer = ngx.timer.every(local_conf.fetch_interval, fetch_all_services)
193+
log.info("zk discovery fetch timer started, interval: ", local_conf.fetch_interval, "s")
194+
end
195+
196+
-- Manually execute a full pull immediately
197+
ngx.timer.at(0, fetch_all_services)
198+
end
199+
200+
function _M.destroy()
201+
if fetch_timer then
202+
fetch_timer = nil
203+
end
204+
instance_cache:flush_all()
205+
log.info("zookeeper discovery destroyed")
206+
end
207+
208+
return _M
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
--
2+
-- Licensed to the Apache Software Foundation (ASF) under one or more
3+
-- contributor license agreements. See the NOTICE file distributed with
4+
-- this work for additional information regarding copyright ownership.
5+
-- The ASF licenses this file to You under the Apache License, Version 2.0
6+
-- (the "License"); you may not use this file except in compliance with
7+
-- the License. You may obtain a copy of the License at
8+
--
9+
-- http://www.apache.org/licenses/LICENSE-2.0
10+
--
11+
-- Unless required by applicable law or agreed to in writing, software
12+
-- distributed under the License is distributed on an "AS IS" BASIS,
13+
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
-- See the License for the specific language governing permissions and
15+
-- limitations under the License.
16+
--
17+
18+
local core = require("apisix.core")
19+
20+
local schema = {
21+
type = "object",
22+
properties = {
23+
-- ZooKeeper Cluster Addresses (separated by commas for multiple addresses)
24+
connect_string = {
25+
type = "string",
26+
default = "127.0.0.1:2181"
27+
},
28+
-- ZooKeeper Session Timeout (milliseconds)
29+
session_timeout = {
30+
type = "integer",
31+
minimum = 1000,
32+
default = 30000
33+
},
34+
-- ZooKeeper Connect Timeout (milliseconds)
35+
connect_timeout = {
36+
type = "integer",
37+
minimum = 1000,
38+
default = 5000
39+
},
40+
-- Service Discovery Root Path
41+
root_path = {
42+
type = "string",
43+
default = "/apisix/discovery/zk"
44+
},
45+
-- Instance Fetch Interval (seconds)
46+
fetch_interval = {
47+
type = "integer",
48+
minimum = 1,
49+
default = 10
50+
},
51+
-- The default weight value for service instances that do not specify a weight in ZooKeeper.
52+
-- It is used for load balancing (higher weight means more traffic).
53+
-- Default value is 100, and the value range is 1-500.
54+
weight = {
55+
type = "integer",
56+
minimum = 1,
57+
default = 100
58+
},
59+
-- ZooKeeper Authentication Information (digest: username:password):
60+
-- Digest authentication credentials for accessing ZooKeeper cluster.
61+
-- Format requirement: "digest:{username}:{password}".
62+
-- Leave empty to disable authentication (not recommended for production).
63+
auth = {
64+
type = "object",
65+
properties = {
66+
type = {type = "string", enum = {"digest"}, default = "digest"},
67+
creds = {type = "string"} -- 格式: username:password
68+
}
69+
},
70+
-- Cache Expiration Time (seconds):
71+
-- The time after which service instance cache becomes expired.
72+
-- Default value is 60 seconds
73+
cache_ttl = {
74+
type = "integer",
75+
minimum = 1,
76+
default = 60
77+
}
78+
},
79+
required = {},
80+
additionalProperties = false
81+
}
82+
83+
local _M = {
84+
schema = schema
85+
}
86+
87+
function _M.check(conf)
88+
return core.schema.check(schema, conf)
89+
end
90+
91+
return _M

0 commit comments

Comments
 (0)