Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # import_requests.py
- # This script fetches metadata for Rust crates from crates.io, enriches it with AI insights using a local LLM,
- # and performs dependency analysis. It also handles retries, caching, and logging.
- import requests
- import json
- import logging
- import time
- import os
- import re
- import shutil
- import tarfile
- import tempfile
- import subprocess
- import sys
- from typing import Optional, Dict, List, Any, Union
- from concurrent.futures import ThreadPoolExecutor, as_completed
- from bs4 import BeautifulSoup
- import tiktoken
- import requests_cache
- from datetime import datetime
- from llama_cpp import Llama
- # Constants
- MODEL_PATH = os.path.expanduser("~/models/deepseek/deepseek-coder-6.7b-instruct.Q4_K_M.gguf")
- LLAMA_BIN = os.path.expanduser("~/llama.cpp/build/bin/llama-cli")
- MAX_TOKENS = 256
- MODEL_TOKEN_LIMIT = 4096
- PROMPT_TOKEN_MARGIN = 3000
- CHECKPOINT_INTERVAL = 5 # Save intermediary results every N crates
- MAX_RETRIES = 3
- # Setup logging
- logging.basicConfig(
- level=logging.INFO,
- format="%(asctime)s [%(levelname)s] %(message)s",
- handlers=[
- logging.StreamHandler(),
- logging.FileHandler(f"crate_enrichment_{time.strftime('%Y%m%d-%H%M%S')}.log")
- ]
- )
- # GitHub Token
- GITHUB_TOKEN = os.getenv("GITHUB_TOKEN", "")
- # Caching
- session = requests_cache.CachedSession('crate_cache', expire_after=3600)
- # Retry decorator
- def retry_with_backoff(max_retries=3, backoff_factor=1.0):
- """Retry decorator with exponential backoff"""
- def decorator(func):
- def wrapper(*args, **kwargs):
- retries, wait_time = 0, backoff_factor
- while retries < max_retries:
- try:
- return func(*args, **kwargs)
- except Exception as e:
- retries += 1
- if retries == max_retries:
- logging.error(f"All {max_retries} retries failed: {str(e)}")
- return kwargs.get('default_return', None)
- logging.warning(f"Attempt {retries} failed, retrying in {wait_time:.1f}s: {str(e)}")
- time.sleep(wait_time)
- wait_time *= 2
- return None
- return wrapper
- return decorator
- def estimate_tokens(prompt: str) -> int:
- encoding = tiktoken.get_encoding("cl100k_base")
- return len(encoding.encode(prompt))
- def truncate_content(content: str, max_tokens: int = 1000) -> str:
- paragraphs = content.split("\n\n")
- result, current_tokens = "", 0
- encoding = tiktoken.get_encoding("cl100k_base")
- for para in paragraphs:
- tokens = len(encoding.encode(para))
- if current_tokens + tokens <= max_tokens:
- result += para + "\n\n"
- current_tokens += tokens
- else:
- break
- return result.strip()
- def clean_output(output: str, task: str = "general") -> str:
- """Task-specific output cleaning"""
- if not output:
- return ""
- # Remove any remaining prompt artifacts
- output = output.split("<|end|>")[0].strip()
- if task == "classification":
- # For classification tasks, extract just the category
- categories = ["AI", "Database", "Web Framework", "Networking", "Serialization",
- "Utilities", "DevTools", "ML", "Cryptography", "Unknown"]
- for category in categories:
- if re.search(r'\b' + re.escape(category) + r'\b', output, re.IGNORECASE):
- return category
- return "Unknown"
- elif task == "factual_pairs":
- # For factual pairs, ensure proper formatting
- pairs = []
- facts = re.findall(r'✅\s*Factual:?\s*(.*?)(?=❌|\Z)', output, re.DOTALL)
- counterfacts = re.findall(r'❌\s*Counterfactual:?\s*(.*?)(?=✅|\Z)', output, re.DOTALL)
- # Pair them up
- for i in range(min(len(facts), len(counterfacts))):
- pairs.append(f"✅ Factual: {facts[i].strip()}\n❌ Counterfactual: {counterfacts[i].strip()}")
- return "\n\n".join(pairs)
- else:
- # General cleaning - more permissive than before
- lines = [line.strip() for line in output.splitlines() if line.strip()]
- return "\n".join(lines)
- # Load model ONCE at script startup
- model = Llama(
- model_path=MODEL_PATH,
- n_ctx=2048,
- n_gpu_layers=32, # Use as many GPU layers as possible
- verbose=False
- )
- # Then use model for inference
- def run_llama(prompt: str, temp: float = 0.2, max_tokens: int = 256) -> Optional[str]:
- """Run the LLM with customizable parameters per task"""
- try:
- token_count = estimate_tokens(prompt)
- if token_count > PROMPT_TOKEN_MARGIN:
- logging.warning(f"Prompt too long ({token_count} tokens). Truncating.")
- prompt = truncate_content(prompt, PROMPT_TOKEN_MARGIN - 100)
- output = model(
- prompt,
- max_tokens=max_tokens,
- temperature=temp,
- stop=["<|end|>", "<|user|>", "<|system|>"] # Stop at these tokens
- )
- raw_text = output["choices"][0]["text"]
- return clean_output(raw_text)
- except Exception as e:
- logging.error(f"Model inference failed: {str(e)}")
- raise
- def summarize_feature(crate_name: str, features: list) -> Optional[str]:
- """Generate summaries for crate features with better prompting"""
- try:
- if not features:
- return "No features documented for this crate."
- # Format features with their dependencies
- feature_text = ""
- for f in features[:8]: # Limit to 8 features for context size
- feature_name = f.get("name", "")
- deps = f.get("dependencies", [])
- deps_str = ", ".join(deps) if deps else "none"
- feature_text += f"- {feature_name} (dependencies: {deps_str})\n"
- prompt = (
- f"<|system|>You are a Rust programming expert analyzing crate features.\n"
- f"<|user|>For the Rust crate `{crate_name}`, explain these features and what functionality they provide:\n\n"
- f"{feature_text}\n\n"
- f"Provide a concise explanation of each feature's purpose and when a developer would enable it.\n"
- f"<|end|>"
- )
- # Use moderate temperature for informative but natural explanation
- return run_llama(prompt, temp=0.2, max_tokens=350)
- except Exception as e:
- logging.warning(f"Feature summarization failed for {crate_name}: {str(e)}")
- return "Feature summary not available."
- def classify_use_case(crate_name: str, desc: str, keywords: list, readme_summary: str = "", key_deps: list = None) -> Optional[str]:
- """Classify the use case of a crate with rich context"""
- key_deps = key_deps or []
- try:
- joined = ", ".join(keywords[:10]) if keywords else "None"
- key_deps_str = ", ".join(key_deps[:5]) if key_deps else "None"
- desc = truncate_content(desc, 300)
- readme_summary = truncate_content(readme_summary, 300)
- # Few-shot prompting with examples
- prompt = (
- f"<|system|>You are a Rust expert classifying crates into the most appropriate category.\n"
- f"<|user|>\n"
- f"# Example 1\n"
- f"Crate: `tokio`\n"
- f"Description: An asynchronous runtime for the Rust programming language\n"
- f"Keywords: async, runtime, futures\n"
- f"Key Dependencies: mio, bytes, parking_lot\n"
- f"Category: Networking\n\n"
- f"# Example 2\n"
- f"Crate: `serde`\n"
- f"Description: A generic serialization/deserialization framework\n"
- f"Keywords: serde, serialization\n"
- f"Key Dependencies: serde_derive\n"
- f"Category: Serialization\n\n"
- f"# Crate to Classify\n"
- f"Crate: `{crate_name}`\n"
- f"Description: {desc}\n"
- f"Keywords: {joined}\n"
- f"README Summary: {readme_summary}\n"
- f"Key Dependencies: {key_deps_str}\n\n"
- f"Category (pick only one): [AI, Database, Web Framework, Networking, Serialization, Utilities, DevTools, ML, Cryptography, Unknown]\n"
- f"<|end|>"
- )
- # Use lower temperature for classification tasks
- result = run_llama(prompt, temp=0.1, max_tokens=20)
- # Extract just the category name using regex
- categories = ["AI", "Database", "Web Framework", "Networking", "Serialization",
- "Utilities", "DevTools", "ML", "Cryptography", "Unknown"]
- for category in categories:
- if re.search(r'\b' + re.escape(category) + r'\b', result, re.IGNORECASE):
- return category
- return "Unknown" # Default if no category is found
- except Exception as e:
- logging.warning(f"Classification failed for {crate_name}: {str(e)}")
- return "Unknown"
- def score_crate(data: dict) -> float:
- score = (data.get("downloads", 0) / 1000) + (data.get("github_stars", 0) * 10)
- score += len(truncate_content(data.get("readme", ""), 1000)) / 500
- return round(score, 2)
- def factual_pairs(crate: dict) -> Optional[str]:
- try:
- desc = truncate_content(crate.get("description", ""), 300)
- readme_summary = truncate_content(crate.get("readme_summary", ""), 300)
- prompt = (
- f"<|system|>Create 5 factual/counterfactual pairs for the Rust crate. "
- f"Factual statements must be true. Counterfactuals should be plausible but incorrect - "
- f"make them subtle and convincing rather than simple negations.\n"
- f"<|user|>\n"
- f"Crate: {crate['name']}\n"
- f"Description: {desc}\n"
- f"Repo: {crate.get('repository', '')}\n"
- f"README Summary: {readme_summary}\n"
- f"Key Features: {', '.join([f['name'] for f in crate.get('features', [])][:5])}\n\n"
- f"Format each pair as:\n"
- f"✅ Factual: [true statement about the crate]\n"
- f"❌ Counterfactual: [plausible but false statement]\n\n"
- f"Create 5 pairs.\n"
- f"<|end|>"
- )
- # Use higher temperature for creative outputs
- return run_llama(prompt, temp=0.6, max_tokens=400)
- except Exception as e:
- logging.warning(f"Factual pairs generation failed for {crate['name']}: {str(e)}")
- return None
- def extract_code_snippets(readme: str) -> list:
- """Extract code snippets from markdown README"""
- snippets = []
- if not readme:
- return snippets
- # Find Rust code blocks
- pattern = r"```(?:rust|(?:no_run|ignore|compile_fail|mdbook-runnable)?)\s*([\s\S]*?)```"
- matches = re.findall(pattern, readme)
- for code in matches:
- if len(code.strip()) > 10: # Only include non-trivial snippets
- snippets.append(code.strip())
- return snippets[:5] # Limit to 5 snippets
- def download_crate_source(crate_name: str, version: str, temp_dir: str) -> Optional[str]:
- """Download and extract crate source code"""
- try:
- url = f"https://crates.io/api/v1/crates/{crate_name}/{version}/download"
- download_path = os.path.join(temp_dir, f"{crate_name}-{version}.tar.gz")
- with session.get(url, stream=True) as r:
- r.raise_for_status()
- with open(download_path, 'wb') as f:
- for chunk in r.iter_content(chunk_size=8192):
- f.write(chunk)
- extract_path = os.path.join(temp_dir, f"{crate_name}-{version}")
- os.makedirs(extract_path, exist_ok=True)
- with tarfile.open(download_path) as tar:
- tar.extractall(path=extract_path)
- return extract_path
- except Exception as e:
- logging.error(f"Failed to download source for {crate_name}: {str(e)}")
- return None
- def analyze_dependencies(crates_data: list) -> dict:
- """Analyze dependencies between crates"""
- dependency_graph = {}
- crate_names = {crate["name"] for crate in crates_data}
- for crate in crates_data:
- deps = []
- for dep in crate.get("dependencies", []):
- if dep.get("crate_id") in crate_names:
- deps.append(dep.get("crate_id"))
- dependency_graph[crate["name"]] = deps
- # Find most depended-upon crates
- reverse_deps = {}
- for crate, deps in dependency_graph.items():
- for dep in deps:
- if dep not in reverse_deps:
- reverse_deps[dep] = []
- reverse_deps[dep].append(crate)
- return {
- "dependency_graph": dependency_graph,
- "reverse_dependencies": reverse_deps,
- "most_depended": sorted(reverse_deps.items(), key=lambda x: len(x[1]), reverse=True)[:10]
- }
- def save_checkpoint(data: list, filename_prefix: str = "checkpoint") -> str:
- """Save intermediary results with timestamp"""
- timestamp = time.strftime("%Y%m%d-%H%M%S")
- filename = f"{filename_prefix}_{timestamp}.jsonl"
- with open(filename, "w") as out:
- for item in data:
- out.write(json.dumps(item) + "\n")
- logging.info(f"Saved checkpoint to {filename}")
- return filename
- @retry_with_backoff(max_retries=MAX_RETRIES)
- def fetch_crate_metadata(crate: str) -> Optional[dict]:
- try:
- r = session.get(f"https://crates.io/api/v1/crates/{crate}")
- r.raise_for_status()
- data = r.json()
- crate_data = data["crate"]
- latest = crate_data["newest_version"]
- # Get readme
- readme_response = session.get(f"https://crates.io/api/v1/crates/{crate}/readme")
- readme = readme_response.text if readme_response.ok else ""
- # Get dependencies
- deps_response = session.get(f"https://crates.io/api/v1/crates/{crate}/{latest}/dependencies")
- deps = deps_response.json().get("dependencies", []) if deps_response.ok else []
- # Get features - using the versions endpoint
- features = []
- versions_response = session.get(f"https://crates.io/api/v1/crates/{crate}/{latest}")
- if versions_response.ok:
- version_data = versions_response.json().get("version", {})
- features_dict = version_data.get("features", {})
- features = [{"name": k, "dependencies": v} for k, v in features_dict.items()]
- # Repository info and GitHub stars
- repo = crate_data.get("repository", "")
- gh_stars = 0
- # Check if it's a GitHub repo
- if "github.com" in repo and GITHUB_TOKEN:
- match = re.search(r"github.com/([^/]+)/([^/]+)", repo)
- if match:
- owner, repo_name = match.groups()
- repo_name = repo_name.split('.')[0] # Handle .git extensions
- gh_url = f"https://api.github.com/repos/{owner}/{repo_name}"
- gh_headers = {"Authorization": f"token {GITHUB_TOKEN}"} if GITHUB_TOKEN else {}
- gh = session.get(gh_url, headers=gh_headers)
- if gh.ok:
- gh_data = gh.json()
- gh_stars = gh_data.get("stargazers_count", 0)
- # Check if it's hosted on lib.rs
- lib_rs_data = {}
- if "lib.rs" in repo:
- lib_rs_url = f"https://lib.rs/crates/{crate}"
- lib_rs_response = session.get(lib_rs_url)
- if lib_rs_response.ok:
- soup = BeautifulSoup(lib_rs_response.text, 'html.parser')
- # Get README from lib.rs if not already available
- if not readme:
- readme_div = soup.find('div', class_='readme')
- if readme_div:
- readme = readme_div.get_text(strip=True)
- # Get lib.rs specific stats
- stats_div = soup.find('div', class_='crate-stats')
- if stats_div:
- downloads_text = stats_div.find(string=re.compile(r'[\d,]+ downloads'))
- if downloads_text:
- lib_rs_data["librs_downloads"] = int(re.sub(r'[^\d]', '', downloads_text))
- # Extract code snippets from readme
- code_snippets = extract_code_snippets(readme)
- # Extract sections from readme
- readme_sections = extract_readme_sections(readme) if readme else ""
- result = {
- "name": crate,
- "version": latest,
- "description": crate_data.get("description", ""),
- "repository": repo,
- "keywords": crate_data.get("keywords", []),
- "categories": crate_data.get("categories", []),
- "readme": readme,
- "downloads": crate_data.get("downloads", 0),
- "github_stars": gh_stars,
- "dependencies": deps,
- "code_snippets": code_snippets,
- "features": features, # Now populated with actual features
- "readme_sections": readme_sections, # New line added
- **lib_rs_data
- }
- return result
- except Exception as e:
- logging.error(f"Failed fetching metadata for {crate}: {str(e)}")
- raise
- def enrich_crate(crate: dict) -> dict:
- """Apply AI enrichments to crate data"""
- try:
- # First generate a README summary to use in other prompts
- if crate["readme"]:
- try:
- readme_content = crate.get("readme_sections", "") or truncate_content(crate.get("readme", ""), 2000)
- prompt = f"<|system|>Extract key features from README.\n<|user|>Summarize key aspects of this Rust crate from its README:\n{readme_content}\n<|end|>"
- crate["readme_summary"] = run_llama(prompt, temp=0.3, max_tokens=300)
- except Exception as e:
- logging.warning(f"README summary failed for {crate['name']}: {str(e)}")
- crate["readme_summary"] = None
- # Extract key dependencies for context
- key_deps = [dep.get("crate_id") for dep in crate.get("dependencies", [])[:5] if dep.get("kind") == "normal"]
- # Now use this enriched context for classification
- crate["feature_summary"] = summarize_feature(crate["name"], crate["features"])
- crate["use_case"] = classify_use_case(
- crate["name"],
- crate["description"],
- crate["keywords"],
- crate.get("readme_summary", ""),
- key_deps
- )
- crate["score"] = score_crate(crate)
- crate["factual_counterfactual"] = factual_pairs(crate)
- return crate
- except Exception as e:
- logging.error(f"Failed to enrich {crate['name']}: {str(e)}")
- return crate
- def get_crate_list() -> list:
- return [
- # Original crates
- "serde", "tokio", "reqwest", "rand", "clap", "rayon", "uuid", "actix-web", "sqlx", "candle-core", "onnxruntime",
- # ML/AI crates
- "tokenizers", "safetensors", "linfa", "ndarray", "smartcore", "burn", "tract", "tch",
- # Add more crates here...
- # Other specialized crates
- "movingai", "ug-metal", "surrealml-core", "tauri",
- # Add more crates here...
- ]
- def main():
- start_time = time.time()
- crates = get_crate_list()
- logging.info(f"Fetching and enriching {len(crates)} crates...")
- # Create timestamped output directory
- timestamp = time.strftime("%Y%m%d-%H%M%S")
- output_dir = f"crate_data_{timestamp}"
- os.makedirs(output_dir, exist_ok=True)
- enriched = []
- # Step 1: Fetch metadata for all crates
- with ThreadPoolExecutor(max_workers=4) as pool:
- futures = {pool.submit(fetch_crate_metadata, name): name for name in crates}
- for i, future in enumerate(as_completed(futures)):
- crate_name = futures[future]
- try:
- data = future.result()
- if data:
- enriched.append(data)
- logging.info(f"Fetched metadata for {crate_name} ({i+1}/{len(crates)})")
- # Save checkpoint periodically
- if (i+1) % CHECKPOINT_INTERVAL == 0 or i+1 == len(crates):
- save_checkpoint(enriched, f"{output_dir}/metadata_checkpoint")
- except Exception as e:
- logging.error(f"Failed processing {crate_name}: {str(e)}")
- # Step 2: Enrich crates with AI insights
- for i, crate in enumerate(enriched):
- try:
- logging.info(f"Enriching {crate['name']} ({i+1}/{len(enriched)})")
- enriched[i] = enrich_crate(crate)
- # Save checkpoint periodically
- if (i+1) % CHECKPOINT_INTERVAL == 0 or i+1 == len(enriched):
- save_checkpoint(enriched, f"{output_dir}/ai_enriched_checkpoint")
- except Exception as e:
- logging.error(f"Failed to enrich {crate['name']}: {str(e)}")
- # Step 3: Perform dependency analysis
- logging.info("Analyzing crate dependencies...")
- dependency_analysis = analyze_dependencies(enriched)
- # Save final results
- final_output = f"{output_dir}/enriched_crate_metadata_{timestamp}.jsonl"
- with open(final_output, "w") as out:
- for item in enriched:
- out.write(json.dumps(item) + "\n")
- # Save dependency analysis
- with open(f"{output_dir}/dependency_analysis_{timestamp}.json", "w") as out:
- json.dump(dependency_analysis, out, indent=2)
- # Generate summary report
- summary = {
- "total_crates": len(enriched),
- "total_time": f"{time.time() - start_time:.2f}s",
- "timestamp": datetime.now().isoformat(),
- "most_popular": sorted(enriched, key=lambda x: x.get("score", 0), reverse=True)[:5],
- "most_depended_upon": dependency_analysis["most_depended"][:5]
- }
- with open(f"{output_dir}/summary_report_{timestamp}.json", "w") as out:
- json.dump(summary, out, indent=2)
- logging.info(f"✅ Done. Enriched {len(enriched)} crates in {time.time() - start_time:.2f}s")
- logging.info(f"Results saved to {output_dir}/")
- if __name__ == "__main__":
- # Check disk space before starting
- if shutil.disk_usage("/").free < 1_000_000_000: # 1GB
- logging.warning("Low disk space! This may affect performance.")
- try:
- main()
- except Exception as e:
- logging.critical(f"Script failed: {str(e)}")
- sys.exit(1)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement