Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 194 additions & 1 deletion src/common/backup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
use crate::common::{Error, Result};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::io::AsyncReadExt;
use tokio::sync::RwLock;

/// Backup type
Expand Down Expand Up @@ -469,7 +471,89 @@ impl BackupManager {
})
.await;

// TODO: Implement checksum verification
if manifest.data_files.is_empty() {
return Err(Error::Other(
"Backup manifest has no data_files to verify".to_string(),
));
}

let total_bytes: u64 = manifest.data_files.iter().map(|f| f.size).sum();
self.update_progress(&restore_id, |p| {
p.total_bytes = total_bytes;
p.processed_bytes = 0;
p.percent_complete = 0.0;
p.rate_bytes_per_sec = 0;
p.eta_seconds = None;
})
.await;

let mut verified_files = Vec::with_capacity(manifest.data_files.len());
let mut processed_bytes = 0u64;

for file in &manifest.data_files {
let file_path = backup_dir.join(&file.path);
if !file_path.exists() {
return Err(Error::NotFound(format!(
"Backup file missing: {}",
file.path
)));
}

let metadata = tokio::fs::metadata(&file_path)
.await
.map_err(|e| Error::Other(format!("Failed to read metadata: {}", e)))?;
let actual_size = metadata.len();
if actual_size != file.size {
return Err(Error::Corrupted(format!(
"File size mismatch for {}: expected {}, got {}",
file.path, file.size, actual_size
)));
}

let (actual_checksum, bytes_read) = compute_sha256_file(&file_path).await?;
let expected_checksum = file.checksum.to_lowercase();
let actual_checksum_lc = actual_checksum.to_lowercase();

if expected_checksum != actual_checksum_lc {
return Err(Error::ChecksumMismatch {
expected: expected_checksum,
actual: actual_checksum_lc,
});
}

processed_bytes = processed_bytes.saturating_add(bytes_read);
let percent = if total_bytes == 0 {
100.0
} else {
(processed_bytes as f64 / total_bytes as f64) * 100.0
};

self.update_progress(&restore_id, |p| {
p.processed_bytes = processed_bytes;
p.percent_complete = percent.min(100.0);
})
.await;

verified_files.push(BackupFile {
path: file.path.clone(),
size: file.size,
checksum: actual_checksum,
encrypted: file.encrypted,
compressed: file.compressed,
});
}

if !manifest.checksum.trim().is_empty() {
let computed_manifest_checksum = compute_manifest_checksum(&verified_files);
let expected_manifest_checksum = manifest.checksum.to_lowercase();
let actual_manifest_checksum = computed_manifest_checksum.to_lowercase();
if expected_manifest_checksum != actual_manifest_checksum {
return Err(Error::ChecksumMismatch {
expected: expected_manifest_checksum,
actual: actual_manifest_checksum,
});
}
}
}

Ok(restore_id)
Expand Down Expand Up @@ -521,6 +605,47 @@ impl BackupManager {
}
}

async fn compute_sha256_file(path: &Path) -> Result<(String, u64)> {
let mut file = tokio::fs::File::open(path)
.await
.map_err(|e| Error::Other(format!("Failed to open file {:?}: {}", path, e)))?;
let mut hasher = Sha256::new();
let mut buf = vec![0u8; 1024 * 1024];
let mut total_read = 0u64;

loop {
let read = file
.read(&mut buf)
.await
.map_err(|e| Error::Other(format!("Failed to read file {:?}: {}", path, e)))?;
if read == 0 {
break;
}
hasher.update(&buf[..read]);
total_read = total_read.saturating_add(read as u64);
}

let digest = hasher.finalize();
Ok((hex::encode(digest), total_read))
}

fn compute_manifest_checksum(files: &[BackupFile]) -> String {
let mut sorted = files.to_vec();
sorted.sort_by(|a, b| a.path.cmp(&b.path));

let mut hasher = Sha256::new();
for file in sorted {
let path_bytes = file.path.as_bytes();
hasher.update((path_bytes.len() as u64).to_le_bytes());
hasher.update(path_bytes);
hasher.update(file.size.to_le_bytes());
hasher.update([file.encrypted as u8]);
hasher.update([file.compressed as u8]);
hasher.update(file.checksum.as_bytes());
}
hex::encode(hasher.finalize())
}

impl BackupType {
fn as_str(self) -> &'static str {
match self {
Expand Down Expand Up @@ -615,4 +740,72 @@ mod tests {
let backups = manager.list_backups().await;
assert_eq!(backups.len(), 2);
}

#[tokio::test]
async fn test_restore_verifies_checksums() {
let temp_dir = TempDir::new().unwrap();
let manager = BackupManager::new(temp_dir.path());

let backup_id = "backup-full-test".to_string();
let backup_dir = temp_dir.path().join(&backup_id);
tokio::fs::create_dir_all(backup_dir.join("data"))
.await
.unwrap();

let file_path = backup_dir.join("data").join("chunk-0001.bin");
tokio::fs::write(&file_path, b"minikv-test-data")
.await
.unwrap();

let (checksum, size) = compute_sha256_file(&file_path).await.unwrap();
let data_files = vec![BackupFile {
path: "data/chunk-0001.bin".to_string(),
size,
checksum,
encrypted: false,
compressed: false,
}];

let manifest = BackupManifest {
id: backup_id.clone(),
backup_type: BackupType::Full,
status: BackupStatus::Completed,
started_at: Utc::now(),
completed_at: Some(Utc::now()),
size_bytes: size,
key_count: 1,
checksum: compute_manifest_checksum(&data_files),
parent_id: None,
wal_sequence: 0,
data_files: data_files.clone(),
config: BackupConfig::default(),
metadata: HashMap::new(),
};

let manifest_json = serde_json::to_string_pretty(&manifest).unwrap();
tokio::fs::write(backup_dir.join("manifest.json"), manifest_json)
.await
.unwrap();

let restore_id = manager
.start_restore(RestoreConfig {
backup_id,
source: BackupDestination::Local {
path: temp_dir.path().to_string_lossy().to_string(),
},
target_path: temp_dir
.path()
.join("restore-target")
.to_string_lossy()
.to_string(),
decryption_key: None,
point_in_time: None,
parallel_workers: 1,
verify_checksums: true,
})
.await
.unwrap();

assert!(restore_id.starts_with("restore-"));
}
}