我这种写法兼容redis的协议,可以让Python的redis库连接操作。
Go服务器端代码
// main.go
package main
import (
"bufio"
"fmt"
"log"
"net"
"strconv"
"strings"
"sync"
"time"
)
// 数据存储结构
type DataStore struct {
mu sync.RWMutex
data map[string]string
expires map[string]time.Time
}
// 创建新的数据存储
func NewDataStore() *DataStore {
ds := &DataStore{
data: make(map[string]string),
expires: make(map[string]time.Time),
}
// 启动过期清理协程
go ds.cleanupExpired()
return ds
}
// 定期清理过期的键
func (ds *DataStore) cleanupExpired() {
ticker := time.NewTicker(1 * time.Second)
for range ticker.C {
ds.mu.Lock()
now := time.Now()
for key, expireTime := range ds.expires {
if now.After(expireTime) {
delete(ds.data, key)
delete(ds.expires, key)
}
}
ds.mu.Unlock()
}
}
// SET操作
func (ds *DataStore) Set(key, value string, expire int) {
ds.mu.Lock()
defer ds.mu.Unlock()
ds.data[key] = value
if expire > 0 {
ds.expires[key] = time.Now().Add(time.Duration(expire) * time.Second)
} else {
delete(ds.expires, key)
}
}
// GET操作
func (ds *DataStore) Get(key string) (string, bool) {
ds.mu.RLock()
defer ds.mu.RUnlock()
// 检查是否过期
if expireTime, exists := ds.expires[key]; exists {
if time.Now().After(expireTime) {
return "", false
}
}
value, exists := ds.data[key]
return value, exists
}
// DEL操作
func (ds *DataStore) Del(keys []string) int {
ds.mu.Lock()
defer ds.mu.Unlock()
count := 0
for _, key := range keys {
if _, exists := ds.data[key]; exists {
delete(ds.data, key)
delete(ds.expires, key)
count++
}
}
return count
}
// EXISTS操作
func (ds *DataStore) Exists(keys []string) int {
ds.mu.RLock()
defer ds.mu.RUnlock()
count := 0
for _, key := range keys {
if _, exists := ds.data[key]; exists {
// 检查是否过期
if expireTime, hasExpire := ds.expires[key]; hasExpire {
if time.Now().Before(expireTime) {
count++
}
} else {
count++
}
}
}
return count
}
// KEYS操作
func (ds *DataStore) Keys(pattern string) []string {
ds.mu.RLock()
defer ds.mu.RUnlock()
var keys []string
now := time.Now()
for key := range ds.data {
// 检查是否过期
if expireTime, hasExpire := ds.expires[key]; hasExpire {
if now.After(expireTime) {
continue
}
}
// 简单的模式匹配(只支持*)
if pattern == "*" || strings.Contains(key, strings.ReplaceAll(pattern, "*", "")) {
keys = append(keys, key)
}
}
return keys
}
// EXPIRE操作
func (ds *DataStore) Expire(key string, seconds int) bool {
ds.mu.Lock()
defer ds.mu.Unlock()
if _, exists := ds.data[key]; exists {
ds.expires[key] = time.Now().Add(time.Duration(seconds) * time.Second)
return true
}
return false
}
// TTL操作
func (ds *DataStore) TTL(key string) int {
ds.mu.RLock()
defer ds.mu.RUnlock()
if _, exists := ds.data[key]; !exists {
return -2 // 键不存在
}
if expireTime, hasExpire := ds.expires[key]; hasExpire {
ttl := int(time.Until(expireTime).Seconds())
if ttl < 0 {
return -2 // 已过期
}
return ttl
}
return -1 // 没有设置过期时间
}
// Redis服务器
type RedisServer struct {
store *DataStore
}
// 创建新的Redis服务器
func NewRedisServer() *RedisServer {
return &RedisServer{
store: NewDataStore(),
}
}
// 解析RESP协议
func (s *RedisServer) parseCommand(reader *bufio.Reader) ([]string, error) {
line, err := reader.ReadString('\n')
if err != nil {
return nil, err
}
line = strings.TrimSpace(line)
// 解析数组长度
if !strings.HasPrefix(line, "*") {
return nil, fmt.Errorf("invalid protocol")
}
count, err := strconv.Atoi(line[1:])
if err != nil {
return nil, err
}
args := make([]string, 0, count)
// 读取每个参数
for i := 0; i < count; i++ {
// 读取块大小
line, err = reader.ReadString('\n')
if err != nil {
return nil, err
}
line = strings.TrimSpace(line)
if !strings.HasPrefix(line, "$") {
return nil, fmt.Errorf("invalid protocol")
}
size, err := strconv.Atoi(line[1:])
if err != nil {
return nil, err
}
// 读取实际数据
data := make([]byte, size+2) // +2 for \r\n
_, err = reader.Read(data)
if err != nil {
return nil, err
}
args = append(args, string(data[:size]))
}
return args, nil
}
// 格式化RESP响应
func formatSimpleString(s string) string {
return "+" + s + "\r\n"
}
func formatError(s string) string {
return "-ERR " + s + "\r\n"
}
func formatBulkString(s string) string {
if s == "" {
return "$-1\r\n"
}
return fmt.Sprintf("$%d\r\n%s\r\n", len(s), s)
}
func formatInteger(n int) string {
return fmt.Sprintf(":%d\r\n", n)
}
func formatArray(items []string) string {
if len(items) == 0 {
return "*0\r\n"
}
result := fmt.Sprintf("*%d\r\n", len(items))
for _, item := range items {
result += formatBulkString(item)
}
return result
}
func formatNil() string {
return "$-1\r\n"
}
// 处理命令
func (s *RedisServer) handleCommand(args []string) string {
if len(args) == 0 {
return formatError("empty command")
}
cmd := strings.ToUpper(args[0])
switch cmd {
case "PING":
if len(args) > 1 {
return formatBulkString(args[1])
}
return formatSimpleString("PONG")
case "SET":
if len(args) < 3 {
return formatError("wrong number of arguments")
}
expire := 0
// 检查是否有EX参数
if len(args) >= 5 && strings.ToUpper(args[3]) == "EX" {
expire, _ = strconv.Atoi(args[4])
}
s.store.Set(args[1], args[2], expire)
return formatSimpleString("OK")
case "GET":
if len(args) != 2 {
return formatError("wrong number of arguments")
}
value, exists := s.store.Get(args[1])
if !exists {
return formatNil()
}
return formatBulkString(value)
case "DEL":
if len(args) < 2 {
return formatError("wrong number of arguments")
}
count := s.store.Del(args[1:])
return formatInteger(count)
case "EXISTS":
if len(args) < 2 {
return formatError("wrong number of arguments")
}
count := s.store.Exists(args[1:])
return formatInteger(count)
case "KEYS":
if len(args) != 2 {
return formatError("wrong number of arguments")
}
keys := s.store.Keys(args[1])
return formatArray(keys)
case "EXPIRE":
if len(args) != 3 {
return formatError("wrong number of arguments")
}
seconds, err := strconv.Atoi(args[2])
if err != nil {
return formatError("invalid expire time")
}
ok := s.store.Expire(args[1], seconds)
if ok {
return formatInteger(1)
}
return formatInteger(0)
case "TTL":
if len(args) != 2 {
return formatError("wrong number of arguments")
}
ttl := s.store.TTL(args[1])
return formatInteger(ttl)
case "FLUSHALL":
s.store.mu.Lock()
s.store.data = make(map[string]string)
s.store.expires = make(map[string]time.Time)
s.store.mu.Unlock()
return formatSimpleString("OK")
case "DBSIZE":
s.store.mu.RLock()
size := len(s.store.data)
s.store.mu.RUnlock()
return formatInteger(size)
case "INFO":
info := "# Server\r\nredis_version:mini-redis-go\r\n"
return formatBulkString(info)
default:
return formatError(fmt.Sprintf("unknown command '%s'", cmd))
}
}
// 处理客户端连接
func (s *RedisServer) handleConnection(conn net.Conn) {
defer conn.Close()
reader := bufio.NewReader(conn)
for {
// 解析命令
args, err := s.parseCommand(reader)
if err != nil {
if err.Error() != "EOF" {
log.Printf("Parse error: %v", err)
}
break
}
// 处理命令
response := s.handleCommand(args)
// 发送响应
_, err = conn.Write([]byte(response))
if err != nil {
log.Printf("Write error: %v", err)
break
}
}
}
// 启动服务器
func (s *RedisServer) Start(port string) error {
listener, err := net.Listen("tcp", ":"+port)
if err != nil {
return err
}
defer listener.Close()
log.Printf("Mini Redis server listening on port %s", port)
for {
conn, err := listener.Accept()
if err != nil {
log.Printf("Accept error: %v", err)
continue
}
go s.handleConnection(conn)
}
}
func main() {
server := NewRedisServer()
if err := server.Start("6380"); err != nil {
log.Fatal(err)
}
} Python客户端测试代码
# test_client.py
import redis
import time
# 连接到我们的Go Redis服务器
r = redis.Redis(host='localhost', port=6380, decode_responses=True)
print("=== 测试基本操作 ===")
# 测试PING
print(f"PING: {r.ping()}")
# 测试SET和GET
r.set('name', 'Alice')
print(f"GET name: {r.get('name')}")
r.set('age', '25')
print(f"GET age: {r.get('age')}")
# 测试EXISTS
print(f"EXISTS name: {r.exists('name')}")
print(f"EXISTS unknown: {r.exists('unknown')}")
# 测试DEL
print(f"DEL age: {r.delete('age')}")
print(f"GET age after delete: {r.get('age')}")
# 测试KEYS
r.set('user:1', 'Bob')
r.set('user:2', 'Charlie')
r.set('post:1', 'Hello World')
print(f"KEYS *: {r.keys('*')}")
print(f"KEYS user:*: {r.keys('user:*')}")
print("\n=== 测试过期时间 ===")
# 测试SET with EX
r.set('temp', 'temporary', ex=3)
print(f"GET temp: {r.get('temp')}")
print(f"TTL temp: {r.ttl('temp')}")
print("等待3秒...")
time.sleep(3)
print(f"GET temp after 3s: {r.get('temp')}")
# 测试EXPIRE
r.set('session', 'abc123')
r.expire('session', 5)
print(f"TTL session: {r.ttl('session')}")
# 测试DBSIZE
print(f"\nDBSIZE: {r.dbsize()}")
# 测试FLUSHALL
print(f"FLUSHALL: {r.flushall()}")
print(f"KEYS after flush: {r.keys('*')}")
print("\n=== 性能测试 ===")
# 简单的性能测试
start = time.time()
for i in range(1000):
r.set(f'perf{i}', f'value{i}')
r.get(f'perf{i}')
end = time.time()
print(f"1000次SET+GET操作耗时: {end - start:.2f}秒")
print(f"平均每秒操作数: {2000/(end-start):.0f} ops/s") 运行说明
启动Go服务器:
# 安装依赖(如果需要) go mod init mini-redis go run main.go
安装Python redis库:
pip install redis
运行Python测试:
python test_client.py
功能特性
该迷你Redis服务器实现了以下功能:
支持的命令
PING: 测试连接
SET: 设置键值对(支持EX过期时间)
GET: 获取值
DEL: 删除键
EXISTS: 检查键是否存在
KEYS: 查找匹配的键
EXPIRE: 设置过期时间
TTL: 获取剩余生存时间
FLUSHALL: 清空所有数据
DBSIZE: 获取数据库大小
INFO: 获取服务器信息
特性
完全兼容Redis RESP协议
支持过期时间管理
线程安全的并发访问
自动清理过期键
支持Python redis库的所有基本操作
扩展建议
如果需要更完整的功能,可以添加:
持久化:实现RDB或AOF持久化
更多数据类型:List、Hash、Set、ZSet
发布订阅:PUB/SUB功能
事务:MULTI/EXEC/WATCH
主从复制:实现主从同步
集群支持:实现Redis Cluster协议
性能优化:使用更高效的数据结构
配置文件:支持配置文件加载
网友回复


