Skip to content

Commit

Permalink
Fix (ModelTC#270)
Browse files Browse the repository at this point in the history
Co-authored-by: wangzaijun <[email protected]>
  • Loading branch information
hiworldwzj and wangzaijun committed Dec 28, 2023
1 parent e902568 commit 7b21936
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -291,4 +291,4 @@ def splitfuse_context_attention_fwd_int8kv(q, k, k_scale, v, v_scale, o,
num_warps=num_warps,
num_stages=1
)
return
return
93 changes: 8 additions & 85 deletions lightllm/server/api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
from .req_id_generator import ReqIDGenerator

from lightllm.utils.net_utils import alloc_can_use_network_port
from lightllm.utils.start_utils import start_submodule_processes

from .api_models import (
ChatCompletionRequest,
Expand Down Expand Up @@ -407,26 +408,8 @@ def main():
model_rpc_ports = can_use_ports[5:]

if args.enable_multimodal:
pipe_cache_reader, pipe_cache_writer = mp.Pipe(duplex=False)
proc_cache = mp.Process(
target=start_cache_manager,
args=(
cache_port,
args.cache_capacity,
args.cache_reserved_ratio,
pipe_cache_writer,
),
)
proc_cache.start()
# wait embed cache init ready
cache_init_state = pipe_cache_reader.recv()
if cache_init_state != 'init ok':
proc_cache.kill()
logger.error(
"cache init state:" +
str(cache_init_state)
)
sys.exit(1)
start_submodule_processes(start_funcs=[start_cache_manager,],
start_args=[(cache_port, args.cache_capacity, args.cache_reserved_ratio)])

from .httpserver.manager import HttpServerManager
global httpserver_manager
Expand All @@ -438,74 +421,14 @@ def main():
httpserver_port=httpserver_port,
enable_multimodal=args.enable_multimodal,
)

pipe_router_reader, pipe_router_writer = mp.Pipe(duplex=False)
pipe_detoken_reader, pipe_detoken_writer = mp.Pipe(duplex=False)
proc_router = mp.Process(
target=start_router_process,
args=(
args,
router_port,
detokenization_port,
model_rpc_ports,
pipe_router_writer,
),
)
proc_router.start()

from .detokenization.manager import start_detokenization_process
proc_detoken = mp.Process(
target=start_detokenization_process,
args=(
args,
detokenization_port,
httpserver_port,
pipe_detoken_writer,
),
)
proc_detoken.start()

# wait load model ready
router_init_state = pipe_router_reader.recv()
detoken_init_state = pipe_detoken_reader.recv()

if router_init_state != "init ok" or detoken_init_state != "init ok":
proc_router.kill()
proc_detoken.kill()
logger.error(
"router init state: " +
str(router_init_state) +
" detoken init state: " +
str(detoken_init_state)
)
sys.exit(1)

assert proc_router.is_alive() and proc_detoken.is_alive()

start_submodule_processes(start_funcs=[start_router_process, start_detokenization_process],
start_args=[(args, router_port, detokenization_port, model_rpc_ports),
(args, detokenization_port, httpserver_port)])
if args.enable_multimodal:
pipe_visual_reader, pipe_visual_writer = mp.Pipe(duplex=False)
proc_visual = mp.Process(
target=start_visual_process,
args=(
args,
router_port,
visual_port,
cache_port,
pipe_visual_writer
),
)
proc_visual.start()
visual_init_state = pipe_visual_reader.recv()

if visual_init_state != "init ok":
proc_visual.kill()
logger.error(
"visual init state:" +
str(visual_init_state)
)
sys.exit(1)

assert proc_visual.is_alive()
start_submodule_processes(start_funcs=[start_visual_process,],
start_args=[(args, router_port, visual_port, cache_port),])

uvicorn.run(
app,
Expand Down
32 changes: 32 additions & 0 deletions lightllm/utils/start_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import sys
import multiprocessing as mp
from lightllm.utils.log_utils import init_logger
logger = init_logger(__name__)

def start_submodule_processes(start_funcs=[], start_args=[]):
assert len(start_funcs) == len(start_args)
pipe_readers = []
processes = []
for start_func, start_arg in zip(start_funcs, start_args):
pipe_reader, pipe_writer = mp.Pipe(duplex=False)
process = mp.Process(
target=start_func,
args=start_arg + (pipe_writer,),
)
process.start()
pipe_readers.append(pipe_reader)
processes.append(process)

# wait to ready
for index, pipe_reader in enumerate(pipe_readers):
init_state = pipe_reader.recv()
if init_state != 'init ok':
logger.error(f"init func {start_funcs[index].__name__} : {str(init_state)}")
for proc in processes:
proc.kill()
sys.exit(1)
else:
logger.info(f"init func {start_funcs[index].__name__} : {str(init_state)}")

assert all([proc.is_alive() for proc in processes])
return

0 comments on commit 7b21936

Please sign in to comment.