在Python中实现类似PHP的FastCGI多租户虚拟空间,需要实现目录隔离、权限控制等功能。以下是一个完整的实现方案:
1. 基础FastCGI服务器实现
import os
import sys
import pwd
import grp
import threading
from flup.server.fcgi import WSGIServer
import configparser
from pathlib import Path
class MultiTenantFastCGI:
def __init__(self, config_file):
self.config = configparser.ConfigParser()
self.config.read(config_file)
self.tenants = {}
self.load_tenants()
def load_tenants(self):
"""加载租户配置"""
for section in self.config.sections():
if section.startswith('tenant_'):
tenant_name = section.replace('tenant_', '')
self.tenants[tenant_name] = {
'domain': self.config.get(section, 'domain'),
'root_dir': self.config.get(section, 'root_dir'),
'uid': self.config.getint(section, 'uid'),
'gid': self.config.getint(section, 'gid'),
'max_memory': self.config.getint(section, 'max_memory', fallback=128),
'max_processes': self.config.getint(section, 'max_processes', fallback=5)
} 2. 目录隔离和沙箱环境
import resource
import subprocess
import tempfile
class SandboxEnvironment:
"""沙箱环境管理器"""
def __init__(self, tenant_config):
self.tenant_config = tenant_config
self.root_dir = tenant_config['root_dir']
self.uid = tenant_config['uid']
self.gid = tenant_config['gid']
def setup_chroot(self):
"""设置chroot环境"""
# 创建必要的目录结构
self._prepare_chroot_env()
# 切换到租户目录
os.chdir(self.root_dir)
# 设置chroot (需要root权限)
if os.geteuid() == 0:
os.chroot(self.root_dir)
os.chdir('/')
# 降权到租户用户
os.setgid(self.gid)
os.setuid(self.uid)
def _prepare_chroot_env(self):
"""准备chroot环境所需的基础文件"""
required_dirs = ['/dev', '/proc', '/tmp', '/var/tmp']
for dir_path in required_dirs:
full_path = os.path.join(self.root_dir, dir_path.lstrip('/'))
os.makedirs(full_path, exist_ok=True)
# 复制必要的系统文件
self._copy_system_files()
def _copy_system_files(self):
"""复制必要的系统文件到chroot环境"""
system_files = [
'/etc/resolv.conf',
'/etc/hosts',
'/etc/nsswitch.conf'
]
etc_dir = os.path.join(self.root_dir, 'etc')
os.makedirs(etc_dir, exist_ok=True)
for file_path in system_files:
if os.path.exists(file_path):
dest = os.path.join(self.root_dir, file_path.lstrip('/'))
os.makedirs(os.path.dirname(dest), exist_ok=True)
subprocess.run(['cp', file_path, dest])
def set_resource_limits(self):
"""设置资源限制"""
# 内存限制 (MB转换为字节)
max_memory = self.tenant_config['max_memory'] * 1024 * 1024
resource.setrlimit(resource.RLIMIT_AS, (max_memory, max_memory))
# 进程数限制
max_processes = self.tenant_config['max_processes']
resource.setrlimit(resource.RLIMIT_NPROC, (max_processes, max_processes))
# 文件描述符限制
resource.setrlimit(resource.RLIMIT_NOFILE, (256, 256)) 3. 安全的文件系统操作
import os
import stat
from pathlib import Path
class SecureFileSystem:
"""安全的文件系统操作类"""
def __init__(self, base_path, uid, gid):
self.base_path = Path(base_path).resolve()
self.uid = uid
self.gid = gid
def _validate_path(self, path):
"""验证路径是否在允许范围内"""
try:
resolved_path = Path(path).resolve()
# 确保路径在base_path内
resolved_path.relative_to(self.base_path)
return resolved_path
except (ValueError, RuntimeError):
raise PermissionError(f"Access denied: {path}")
def opendir(self, path):
"""安全的目录打开操作"""
safe_path = self._validate_path(path)
# 检查权限
if not self._check_permission(safe_path, os.R_OK):
raise PermissionError(f"Permission denied: {path}")
if not safe_path.is_dir():
raise NotADirectoryError(f"Not a directory: {path}")
return self._list_directory(safe_path)
def _list_directory(self, path):
"""列出目录内容"""
items = []
for item in path.iterdir():
stat_info = item.stat()
items.append({
'name': item.name,
'type': 'dir' if item.is_dir() else 'file',
'size': stat_info.st_size,
'mtime': stat_info.st_mtime,
'permissions': oct(stat_info.st_mode)[-3:]
})
return items
def _check_permission(self, path, mode):
"""检查文件权限"""
try:
stat_info = path.stat()
# 检查所有者权限
if stat_info.st_uid == self.uid:
return bool(stat_info.st_mode & (mode << 6))
# 检查组权限
if stat_info.st_gid == self.gid:
return bool(stat_info.st_mode & (mode << 3))
# 检查其他用户权限
return bool(stat_info.st_mode & mode)
except OSError:
return False
def read_file(self, path, max_size=10*1024*1024):
"""安全读取文件"""
safe_path = self._validate_path(path)
if not self._check_permission(safe_path, os.R_OK):
raise PermissionError(f"Permission denied: {path}")
if safe_path.stat().st_size > max_size:
raise ValueError(f"File too large: {path}")
with open(safe_path, 'r') as f:
return f.read()
def write_file(self, path, content):
"""安全写入文件"""
safe_path = self._validate_path(path)
parent_dir = safe_path.parent
# 检查父目录写权限
if not self._check_permission(parent_dir, os.W_OK):
raise PermissionError(f"Permission denied: {path}")
with open(safe_path, 'w') as f:
f.write(content)
# 设置文件所有者
os.chown(safe_path, self.uid, self.gid) 4. WSGI应用处理器
class TenantWSGIApp:
"""租户WSGI应用"""
def __init__(self, tenant_name, tenant_config):
self.tenant_name = tenant_name
self.tenant_config = tenant_config
self.sandbox = SandboxEnvironment(tenant_config)
self.fs = SecureFileSystem(
tenant_config['root_dir'],
tenant_config['uid'],
tenant_config['gid']
)
def __call__(self, environ, start_response):
"""WSGI应用入口"""
try:
# 设置沙箱环境
self.setup_environment()
# 获取请求路径
path = environ.get('PATH_INFO', '/')
method = environ.get('REQUEST_METHOD', 'GET')
# 处理请求
if method == 'GET':
response = self.handle_get(path)
elif method == 'POST':
response = self.handle_post(path, environ)
else:
response = self.error_response(405, 'Method Not Allowed')
# 返回响应
status = response['status']
headers = response['headers']
start_response(status, headers)
return [response['body'].encode('utf-8')]
except Exception as e:
return self.handle_error(e, start_response)
def setup_environment(self):
"""设置执行环境"""
# 设置资源限制
self.sandbox.set_resource_limits()
# 如果有root权限,设置chroot
if os.geteuid() == 0:
self.sandbox.setup_chroot()
def handle_get(self, path):
"""处理GET请求"""
# 构建实际文件路径
file_path = os.path.join(self.tenant_config['root_dir'], path.lstrip('/'))
# 如果是目录,列出内容
if os.path.isdir(file_path):
try:
items = self.fs.opendir(file_path)
body = self.render_directory_listing(path, items)
return {
'status': '200 OK',
'headers': [('Content-Type', 'text/html')],
'body': body
}
except PermissionError:
return self.error_response(403, 'Forbidden')
# 如果是文件,返回内容
elif os.path.isfile(file_path):
try:
content = self.fs.read_file(file_path)
return {
'status': '200 OK',
'headers': [('Content-Type', 'text/plain')],
'body': content
}
except PermissionError:
return self.error_response(403, 'Forbidden')
return self.error_response(404, 'Not Found')
def render_directory_listing(self, path, items):
"""渲染目录列表"""
html = f"""
<html>
<head><title>Directory: {path}</title></head>
<body>
<h1>Directory: {path}</h1>
<ul>
"""
for item in items:
name = item['name']
item_type = item['type']
if item_type == 'dir':
html += f'<li><a href="{os.path.join(path, name)}/">[DIR] {name}</a></li>'
else:
html += f'<li><a href="{os.path.join(path, name)}">{name}</a> ({item["size"]} bytes)</li>'
html += """
</ul>
</body>
</html>
"""
return html
def error_response(self, code, message):
"""错误响应"""
return {
'status': f'{code} {message}',
'headers': [('Content-Type', 'text/plain')],
'body': message
} 5. 主服务器启动
def create_tenant_app(tenant_name, config):
"""为每个租户创建独立的应用实例"""
tenant_config = config['tenants'][tenant_name]
return TenantWSGIApp(tenant_name, tenant_config)
def main():
# 配置文件示例
config_content = """
[tenant_user1]
domain = user1.example.com
root_dir = /var/www/tenants/user1
uid = 1001
gid = 1001
max_memory = 128
max_processes = 5
[tenant_user2]
domain = user2.example.com
root_dir = /var/www/tenants/user2
uid = 1002
gid = 1002
max_memory = 256
max_processes = 10
"""
# 初始化多租户FastCGI服务器
server = MultiTenantFastCGI('tenants.conf')
# 路由处理器
def application(environ, start_response):
# 根据域名确定租户
host = environ.get('HTTP_HOST', '')
for tenant_name, config in server.tenants.items():
if config['domain'] == host:
app = create_tenant_app(tenant_name, server)
return app(environ, start_response)
# 默认响应
start_response('404 Not Found', [('Content-Type', 'text/plain')])
return [b'Tenant not found']
# 启动FastCGI服务器
WSGIServer(application, bindAddress=('127.0.0.1', 9000)).run()
if __name__ == '__main__':
main() 6. Nginx配置示例
server {
listen 80;
server_name *.example.com;
location / {
fastcgi_pass 127.0.0.1:9000;
fastcgi_param SCRIPT_FILENAME $document_root$fastcgi_script_name;
fastcgi_param PATH_INFO $fastcgi_path_info;
include fastcgi_params;
}
} 这个实现提供了:
目录隔离:通过chroot和路径验证
权限控制:基于UID/GID的权限检查
资源限制:内存、进程数限制
安全性:路径遍历保护、文件大小限制
多租户支持:基于域名的租户识别
使用时需要注意:
需要root权限才能使用chroot
建议使用容器技术(Docker)增强隔离
定期更新和审计安全配置
网友回复
如何用python实现一个公网代理访问软件?
如何用go实现一个公网代理访问软件?
如何用python实现一个内网穿透打洞程序,实现内网的80端口暴露到公网上可以访问?
如何用go实现一个内网穿透打洞程序,实现内网的80端口暴露到公网上可以访问?
何为Shadowsocks 代理?
python如何实现类似php的opendir目录相互隔离的fastcgi多租户虚拟空间?
nodejs如何实现类似php的opendir目录相互隔离的fastcgi多租户虚拟空间?
如何用html写出网页滚动视频播放卡片视觉差异效果的代码?
程序员如何低成本搭建代理进行科学上网学习技术?
threejs如何做个三维搭积木的游戏?


