mirror of
https://github.com/instructkr/claw-code.git
synced 2026-06-07 23:12:14 +08:00
feat(rag): add claw-rag-service
Adds claw-rag-service for repository indexing and semantic search.
This commit is contained in:
219
rust/crates/claw-rag-service/src/ingest.rs
Normal file
219
rust/crates/claw-rag-service/src/ingest.rs
Normal file
@@ -0,0 +1,219 @@
|
||||
//! Walk workspace and fill `SQLite` + embeddings.
|
||||
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use reqwest::Client;
|
||||
use walkdir::WalkDir;
|
||||
|
||||
use crate::chunk::chunk_text;
|
||||
use crate::db::{
|
||||
delete_file_and_chunks, file_is_unchanged, insert_chunk, insert_embedding, list_all_files,
|
||||
open_db, upsert_file_meta,
|
||||
};
|
||||
use crate::embed::{embed_batch, EmbedConfig};
|
||||
#[cfg(feature = "qdrant-index")]
|
||||
use crate::qdrant_index::{upsert_points, ChunkPoint};
|
||||
|
||||
const DEFAULT_MAX_FILE_BYTES: u64 = 2 * 1024 * 1024;
|
||||
const CHUNK_CHARS: usize = 900;
|
||||
const CHUNK_OVERLAP: usize = 120;
|
||||
const EMBED_BATCH: usize = 16;
|
||||
|
||||
static SKIP_DIR_NAMES: &[&str] = &[".git", "target", "node_modules", "__pycache__", ".claw-rag"];
|
||||
|
||||
static TEXT_EXTENSIONS: &[&str] = &[
|
||||
"rs", "md", "toml", "txt", "json", "yaml", "yml", "js", "ts", "tsx", "jsx", "py", "go", "c",
|
||||
"h", "cpp", "hpp", "cs", "java", "kt", "swift", "rb", "php", "sh", "ps1", "html", "css", "sql",
|
||||
];
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct IngestStats {
|
||||
pub files_indexed: usize,
|
||||
pub chunks_total: usize,
|
||||
pub embeddings_written: usize,
|
||||
}
|
||||
|
||||
fn should_skip_dir(path: &Path) -> bool {
|
||||
path.file_name()
|
||||
.and_then(std::ffi::OsStr::to_str)
|
||||
.is_some_and(|n| SKIP_DIR_NAMES.contains(&n))
|
||||
}
|
||||
|
||||
fn is_text_extension(path: &Path) -> bool {
|
||||
path.extension()
|
||||
.and_then(std::ffi::OsStr::to_str)
|
||||
.is_some_and(|e| TEXT_EXTENSIONS.contains(&e.to_ascii_lowercase().as_str()))
|
||||
}
|
||||
|
||||
async fn flush_path_batch(
|
||||
conn: &rusqlite::Connection,
|
||||
path: &str,
|
||||
batch: &mut Vec<(i32, String)>,
|
||||
client: &Client,
|
||||
cfg: &EmbedConfig,
|
||||
stats: &mut IngestStats,
|
||||
) -> Result<(), String> {
|
||||
if batch.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
let texts: Vec<String> = batch.iter().map(|(_, t)| t.clone()).collect();
|
||||
let vecs = embed_batch(client, cfg, &texts).await?;
|
||||
if vecs.len() != batch.len() {
|
||||
return Err("embed batch size mismatch".into());
|
||||
}
|
||||
|
||||
#[cfg(feature = "qdrant-index")]
|
||||
let mut qdrant_points: Vec<ChunkPoint> = Vec::with_capacity(batch.len());
|
||||
|
||||
for ((ord, t), vec) in batch.drain(..).zip(vecs.into_iter()) {
|
||||
let dim = vec.len();
|
||||
let cid = insert_chunk(conn, path, ord, &t)?;
|
||||
insert_embedding(conn, cid, dim, &vec)?;
|
||||
stats.embeddings_written += 1;
|
||||
|
||||
#[cfg(feature = "qdrant-index")]
|
||||
{
|
||||
qdrant_points.push(ChunkPoint {
|
||||
id: cid,
|
||||
vec,
|
||||
path: path.to_string(),
|
||||
text: t,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "qdrant-index")]
|
||||
upsert_points(qdrant_points).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn run_ingest(
|
||||
workspaces: &[PathBuf],
|
||||
db_path: &Path,
|
||||
cfg: &EmbedConfig,
|
||||
client: &Client,
|
||||
) -> Result<IngestStats, String> {
|
||||
let conn = open_db(db_path)?;
|
||||
|
||||
let mut all_files: Vec<(String, PathBuf)> = Vec::new();
|
||||
let mut seen_paths: Vec<String> = Vec::new();
|
||||
|
||||
for ws in workspaces {
|
||||
let workspace = ws
|
||||
.canonicalize()
|
||||
.map_err(|e| format!("workspace: {}: {e}", ws.display()))?;
|
||||
let ws_prefix = workspace.clone();
|
||||
let repo_id = repo_id_for_workspace(&workspace);
|
||||
|
||||
for entry in WalkDir::new(&workspace)
|
||||
.into_iter()
|
||||
.filter_entry(|e| !should_skip_dir(e.path()))
|
||||
{
|
||||
let entry = entry.map_err(|e| e.to_string())?;
|
||||
if !entry.file_type().is_file() {
|
||||
continue;
|
||||
}
|
||||
let path = entry.path();
|
||||
if !is_text_extension(path) {
|
||||
continue;
|
||||
}
|
||||
let meta = entry.metadata().map_err(|e| e.to_string())?;
|
||||
if meta.len() > DEFAULT_MAX_FILE_BYTES {
|
||||
continue;
|
||||
}
|
||||
let rel = path
|
||||
.strip_prefix(&ws_prefix)
|
||||
.unwrap_or(path)
|
||||
.to_string_lossy()
|
||||
.replace('\\', "/");
|
||||
let key = format!("{repo_id}:{rel}");
|
||||
seen_paths.push(key.clone());
|
||||
all_files.push((key, path.to_path_buf()));
|
||||
}
|
||||
}
|
||||
|
||||
all_files.sort_by(|a, b| a.0.cmp(&b.0));
|
||||
seen_paths.sort();
|
||||
|
||||
let mut stats = IngestStats {
|
||||
files_indexed: all_files.len(),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
for (rel, file) in all_files {
|
||||
let Ok(meta) = std::fs::metadata(&file) else {
|
||||
continue;
|
||||
};
|
||||
let size_bytes =
|
||||
i64::try_from(meta.len()).map_err(|_| "file size too large".to_string())?;
|
||||
let mtime_ms = meta
|
||||
.modified()
|
||||
.ok()
|
||||
.and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
|
||||
.and_then(|d| i64::try_from(d.as_millis()).ok())
|
||||
.unwrap_or(0);
|
||||
|
||||
let Ok(raw) = std::fs::read_to_string(&file) else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let content_hash = blake3::hash(raw.as_bytes()).to_hex().to_string();
|
||||
if file_is_unchanged(&conn, &rel, &content_hash, size_bytes, mtime_ms)? {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Re-index this file: delete previous chunks (and embeddings) for path.
|
||||
delete_file_and_chunks(&conn, &rel)?;
|
||||
|
||||
let pieces = chunk_text(&raw, CHUNK_CHARS, CHUNK_OVERLAP);
|
||||
if pieces.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut batch: Vec<(i32, String)> = Vec::new();
|
||||
for (ord, piece) in pieces.into_iter().enumerate() {
|
||||
stats.chunks_total += 1;
|
||||
let ord_i32 =
|
||||
i32::try_from(ord).map_err(|_| "file produced too many chunks".to_string())?;
|
||||
batch.push((ord_i32, piece));
|
||||
if batch.len() >= EMBED_BATCH {
|
||||
flush_path_batch(&conn, &rel, &mut batch, client, cfg, &mut stats).await?;
|
||||
}
|
||||
}
|
||||
flush_path_batch(&conn, &rel, &mut batch, client, cfg, &mut stats).await?;
|
||||
|
||||
let now_ms = std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.map(|d| i64::try_from(d.as_millis()).unwrap_or(0))
|
||||
.unwrap_or(0);
|
||||
upsert_file_meta(&conn, &rel, &content_hash, size_bytes, mtime_ms, now_ms)?;
|
||||
}
|
||||
|
||||
// Delete entries for files that no longer exist.
|
||||
// (We compare against file list from DB to avoid needing a SQL "NOT IN" temp table.)
|
||||
let mut seen_set = std::collections::BTreeSet::new();
|
||||
for p in &seen_paths {
|
||||
seen_set.insert(p.as_str());
|
||||
}
|
||||
for p in list_all_files(&conn)? {
|
||||
if !seen_set.contains(p.as_str()) {
|
||||
delete_file_and_chunks(&conn, &p)?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(stats)
|
||||
}
|
||||
|
||||
fn repo_id_for_workspace(workspace: &Path) -> String {
|
||||
let name = workspace
|
||||
.file_name()
|
||||
.and_then(std::ffi::OsStr::to_str)
|
||||
.filter(|s| !s.is_empty())
|
||||
.unwrap_or("workspace");
|
||||
let hash = blake3::hash(workspace.to_string_lossy().as_bytes())
|
||||
.to_hex()
|
||||
.to_string();
|
||||
format!("{name}-{h}", name = name, h = &hash[..8])
|
||||
}
|
||||
Reference in New Issue
Block a user