在 Linux 上防止指定文件夹下的文件被非法篡改,单纯依靠 Python 或 Go 脚本是无法做到完美实时“阻止”的,因为这些脚本运行在用户态,而真正的文件系统操作发生在内核态。
一个拥有足够权限的恶意程序或用户总能绕过你的脚本。
因此,最佳策略是分层防御:
操作系统层面的预防:利用 Linux 自身强大的权限和属性系统,这是最坚固的第一道防线。
应用程序层面的监控:使用 Python 或 Go 编写脚本,作为第二道防线,用于实时监控、记录和告警,以便在防御被突破时能立即发现并响应。
下面我将详细解释这两种策略,并提供 Python 和 Go 的代码实现。
第一层:利用Linux自身机制(最有效的预防)
在编写任何代码之前,首先应该使用 Linux 的内置工具。这是最直接、最有效、资源消耗最低的方法。
1. 设置严格的文件权限 (chmod, chown)确保文件夹和文件的所有者和权限设置正确。例如,如果文件只需要被特定用户(如 www-data)读取,就不要给予写权限。
# 假设目标文件夹是 /var/www/protected_content
# 将所有权交给一个专用的、低权限的用户,例如 myappuser
sudo chown -R myappuser:myappuser /var/www/protected_content
# 对文件夹设置权限:所有者可读写执行,组用户可读执行,其他用户无权限
sudo chmod 750 /var/www/protected_content
# 对文件设置权限:所有者可读写,组用户可读,其他用户无权限
# 如果文件不需要被程序修改,甚至可以设置为 440 (只读)
sudo find /var/www/protected_content -type f -exec chmod 640 {} \;
# 或者更严格的只读
sudo find /var/www/protected_content -type f -exec chmod 440 {} \; 2. 使用不可变属性 (chattr) - 终极武器这是防止篡改的最强方法。chattr命令可以给文件设置扩展属性。其中 +i (immutable) 属性意味着文件将变得不可变。
一旦文件被设置为 +i 属性,那么:
不能修改文件内容。
不能删除文件。
不能重命名文件。
不能创建硬链接。
甚至 root 用户也无法直接修改它!
如何使用:
# 对单个文件设置不可变属性 sudo chattr +i /var/www/protected_content/config.ini # 对文件夹下所有文件递归设置不可变属性 sudo chattr -R +i /var/www/protected_content # 如果需要合法地修改文件,必须先移除该属性 sudo chattr -i /var/www/protected_content/config.ini
优点:
极度安全,由内核强制执行。
几乎无性能开销。
缺点:
合法更新文件也变得繁琐,需要先 chattr -i,修改完后再 chattr +i。
无法防止新文件的创建。对于文件夹,可以设置 +a (append only),只允许追加内容,但不能修改已有内容。
第二层:使用Python/Go进行监控和告警(检测与响应)
当第一层防御可能因管理疏忽被绕过时(比如管理员忘记设置 chattr),监控脚本就派上用场了。它可以检测到任何预期之外的变更。
主要有两种实现思路:
定期轮询和哈希校验:定时扫描文件夹,计算每个文件的哈希值,与之前保存的“白名单”哈希值进行比对。
实时事件监控 (inotify):利用 Linux 内核的 inotify 机制,当文件系统发生变化时,程序会立即收到通知。这是更高效、更实时的方法。
方案一:哈希校验 (Python & Go)原理:
初始化:扫描目标文件夹,为每个文件计算一个加密哈希(如 SHA-256),将 (文件路径, 哈希值) 的映射关系保存到一个安全的地方(基线数据库)。
监控:定时重复步骤1,将新生成的哈希与基线数据库进行比较。
哈希值不匹配 -> 文件被篡改。
文件存在于基线但现在不见了 -> 文件被删除。
新文件出现 -> 文件夹中出现了未知文件。
Python 实现 (哈希校验)import os
import hashlib
import json
import time
TARGET_DIR = "/path/to/your/protected_folder"
BASELINE_FILE = "/path/to/your/baseline.json"
def calculate_sha256(filepath):
"""计算文件的SHA256哈希值"""
sha256_hash = hashlib.sha256()
try:
with open(filepath, "rb") as f:
# 逐块读取,防止大文件撑爆内存
for byte_block in iter(lambda: f.read(4096), b""):
sha256_hash.update(byte_block)
return sha256_hash.hexdigest()
except IOError:
return None
def create_baseline():
"""创建基线哈希数据库"""
baseline = {}
for dirpath, _, filenames in os.walk(TARGET_DIR):
for filename in filenames:
filepath = os.path.join(dirpath, filename)
file_hash = calculate_sha256(filepath)
if file_hash:
baseline[filepath] = file_hash
with open(BASELINE_FILE, "w") as f:
json.dump(baseline, f, indent=4)
print(f"Baseline created at {BASELINE_FILE}")
def verify_integrity():
"""校验文件完整性"""
try:
with open(BASELINE_FILE, "r") as f:
baseline = json.load(f)
except FileNotFoundError:
print("Baseline file not found. Please create it first.")
return
current_state = {}
for dirpath, _, filenames in os.walk(TARGET_DIR):
for filename in filenames:
filepath = os.path.join(dirpath, filename)
current_state[filepath] = calculate_sha256(filepath)
# 检查篡改和删除
for filepath, original_hash in baseline.items():
if filepath not in current_state:
print(f"ALERT: File deleted! -> {filepath}")
elif current_state[filepath] != original_hash:
print(f"ALERT: File tampered! -> {filepath}")
# 检查新增文件
for filepath in current_state:
if filepath not in baseline:
print(f"ALERT: New file created! -> {filepath}")
if __name__ == "__main__":
# 首次运行时,取消下面这行注释来创建基线
# create_baseline()
# 启动持续监控
print(f"Starting monitoring for {TARGET_DIR}...")
while True:
verify_integrity()
time.sleep(10) # 每10秒检查一次 Go 实现 (哈希校验) package main
import (
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"os"
"path/filepath"
"time"
)
const (
targetDir = "/path/to/your/protected_folder"
baselineFile = "/path/to/your/baseline.json"
)
func calculateSHA256(filePath string) (string, error) {
file, err := os.Open(filePath)
if err != nil {
return "", err
}
defer file.Close()
hash := sha256.New()
if _, err := io.Copy(hash, file); err != nil {
return "", err
}
return hex.EncodeToString(hash.Sum(nil)), nil
}
func createBaseline() {
baseline := make(map[string]string)
err := filepath.Walk(targetDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() {
hash, err := calculateSHA256(path)
if err != nil {
log.Printf("Could not hash file %s: %v", path, err)
return nil
}
baseline[path] = hash
}
return nil
})
if err != nil {
log.Fatalf("Error walking directory: %v", err)
}
data, err := json.MarshalIndent(baseline, "", " ")
if err != nil {
log.Fatalf("Could not marshal baseline to JSON: %v", err)
}
if err := ioutil.WriteFile(baselineFile, data, 0644); err != nil {
log.Fatalf("Could not write baseline file: %v", err)
}
fmt.Printf("Baseline created at %s\n", baselineFile)
}
func verifyIntegrity() {
data, err := ioutil.ReadFile(baselineFile)
if err != nil {
log.Printf("Baseline file not found. Please create it first.")
return
}
var baseline map[string]string
if err := json.Unmarshal(data, &baseline); err != nil {
log.Printf("Could not unmarshal baseline: %v", err)
return
}
currentState := make(map[string]string)
filepath.Walk(targetDir, func(path string, info os.FileInfo, err error) error {
if !info.IsDir() {
hash, _ := calculateSHA256(path)
currentState[path] = hash
}
return nil
})
// Check for tampering and deletion
for path, originalHash := range baseline {
if currentHash, ok := currentState[path]; !ok {
fmt.Printf("ALERT: File deleted! -> %s\n", path)
} else if currentHash != originalHash {
fmt.Printf("ALERT: File tampered! -> %s\n", path)
}
}
// Check for new files
for path := range currentState {
if _, ok := baseline[path]; !ok {
fmt.Printf("ALERT: New file created! -> %s\n", path)
}
}
}
func main() {
// First time run, uncomment the line below
// createBaseline()
log.Printf("Starting monitoring for %s...", targetDir)
for {
verifyIntegrity()
time.Sleep(10 * time.Second)
}
} 方案二:实时事件监控 (inotify)
这种方法更优越,因为它几乎是瞬时的,并且资源消耗更低(不需要反复扫描整个目录)。
Python 实现 (watchdog库)watchdog 是一个封装了 inotify 的优秀第三方库。
pip install watchdog
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
TARGET_DIR = "/path/to/your/protected_folder"
class TamperHandler(FileSystemEventHandler):
def on_modified(self, event):
if not event.is_directory:
print(f"ALERT: File modified! -> {event.src_path}")
def on_created(self, event):
print(f"ALERT: New item created! -> {event.src_path}")
def on_deleted(self, event):
print(f"ALERT: Item deleted! -> {event.src_path}")
def on_moved(self, event):
print(f"ALERT: Item moved! from {event.src_path} to {event.dest_path}")
if __name__ == "__main__":
event_handler = TamperHandler()
observer = Observer()
# 递归监控所有子目录
observer.schedule(event_handler, TARGET_DIR, recursive=True)
print(f"Starting real-time monitoring for {TARGET_DIR}...")
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join() Go 实现 (fsnotify库)fsnotify 是 Go 生态中用于文件系统通知的事实标准库。
go get github.com/fsnotify/fsnotify
package main
import (
"log"
"os"
"path/filepath"
"github.com/fsnotify/fsnotify"
)
const targetDir = "/path/to/your/protected_folder"
func main() {
watcher, err := fsnotify.NewWatcher()
if err != nil {
log.Fatal(err)
}
defer watcher.Close()
done := make(chan bool)
go func() {
for {
select {
case event, ok := <-watcher.Events:
if !ok {
return
}
// fsnotify.Write: 写入或修改
if event.Op&fsnotify.Write == fsnotify.Write {
log.Println("ALERT: File modified! ->", event.Name)
}
// fsnotify.Create: 创建
if event.Op&fsnotify.Create == fsnotify.Create {
log.Println("ALERT: Item created! ->", event.Name)
// 如果创建的是目录,需要将其也加入监控列表
info, err := os.Stat(event.Name)
if err == nil && info.IsDir() {
watcher.Add(event.Name)
log.Println("Added new directory to watch list:", event.Name)
}
}
// fsnotify.Remove: 删除
if event.Op&fsnotify.Remove == fsnotify.Remove {
log.Println("ALERT: Item deleted! ->", event.Name)
}
// fsnotify.Rename: 重命名 (通常表现为旧文件的删除)
if event.Op&fsnotify.Rename == fsnotify.Rename {
log.Println("ALERT: Item renamed! ->", event.Name)
}
case err, ok := <-watcher.Errors:
if !ok {
return
}
log.Println("ERROR:", err)
}
}
}()
log.Printf("Starting real-time monitoring for %s and its subdirectories...", targetDir)
// 递归添加所有子目录到监控列表
err = filepath.Walk(targetDir, func(path string, info os.FileInfo, err error) error {
if info.IsDir() {
return watcher.Add(path)
}
return nil
})
if err != nil {
log.Fatal("Error adding directories to watcher:", err)
}
<-done
} 总结与最佳实践
首选 chattr +i:对于不需要变动的重要配置文件、核心代码文件等,chattr +i 是最简单、最安全的预防手段。
组合使用:将 chattr 和 inotify 监控脚本结合。chattr 负责核心防护,inotify 脚本负责监控整个文件夹,特别是检测非法新文件的创建(chattr 无法阻止)以及对合法修改流程的审计。
安全基线和脚本:如果你使用哈希校验,务必保护好你的基线文件 (baseline.json)!如果攻击者能修改它,你的校验就毫无意义。同样,监控脚本本身也应该被保护,防止被攻击者停止或修改。
告警机制:上述脚本只是打印到控制台。在实际应用中,你应该将告警信息通过邮件、Webhook(如 Slack, DingTalk)、Syslog 等方式发送给管理员,以便及时响应。
专业工具:对于企业级环境,可以考虑使用专业的HIDS(主机入侵检测系统),如 AIDE (Advanced Intrusion Detection Environment) 或 Tripwire,它们是专门为此目的设计的、更成熟的哈希校验工具。另外,基于eBPF的工具如 Falco 也能提供强大的实时监控能力。
网友回复


