Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import pandas as pd
- import sqlite3
- import os
- import numpy as np
- from datetime import datetime
- def create_sample_data():
- """Create a sample sales.csv file if it doesn't exist"""
- if not os.path.exists("sales.csv"):
- print("Creating sample sales.csv file...")
- np.random.seed(42)
- products = ["Laptop", "Monitor", "Mouse", "Keyboard", "Headphones"]
- regions = ["North", "South", "East", "West", "Central"]
- data = {
- "Date": [datetime.now().strftime("%Y-%m-%d") for _ in range(50)],
- "Product": np.random.choice(products, 50),
- "Region": np.random.choice(regions, 50),
- "Qty": np.random.randint(1, 10, 50),
- "Price": np.random.uniform(10, 1000, 50).round(2),
- "Customer_ID": np.random.randint(1000, 9999, 50),
- }
- sample_df = pd.DataFrame(data)
- sample_df.to_csv("sales.csv", index=False)
- print("Sample sales.csv file created successfully.")
- def extract():
- """Extract data from CSV file"""
- print("\nEXTRACT: Reading data from sales.csv...")
- df = pd.read_csv("sales.csv")
- print(f"Extracted {len(df)} records from sales.csv")
- print("Sample data:")
- print(df.head(3))
- return df
- def transform(df):
- """Transform the data"""
- print("\nTRANSFORM: Processing data...")
- df["Amount"] = (df["Qty"] * df["Price"]).round(2)
- df["Price_Formatted"] = "$" + df["Price"].astype(str)
- df["Product"] = df["Product"].str.upper()
- def categorize_amount(amount):
- if amount < 100:
- return "Small"
- elif amount < 500:
- return "Medium"
- else:
- return "Large"
- df["Sale_Category"] = df["Amount"].apply(categorize_amount)
- print("Transformations applied:")
- print("1. Calculated total amount (Qty * Price)")
- print("2. Formatted price as currency")
- print("3. Converted product names to uppercase")
- print("4. Categorized sales by amount")
- print("\nTransformed data sample:")
- print(df.head(3))
- return df
- def load(df):
- """Load data into SQLite database"""
- print("\nLOAD: Storing data in SQLite database...")
- conn = sqlite3.connect("warehouse.db")
- cursor = conn.cursor()
- cursor.execute(
- """
- DROP TABLE IF EXISTS sales
- """
- )
- df.to_sql("sales", conn, if_exists="replace", index=False)
- print(f"Data loaded into 'sales' table in warehouse.db")
- query = "SELECT COUNT(*) FROM sales"
- result = cursor.execute(query).fetchone()
- print(f"Verification: {result[0]} records in the sales table")
- print("\nTable schema:")
- schema = cursor.execute("PRAGMA table_info(sales)").fetchall()
- for column in schema:
- print(f" {column[1]} ({column[2]})")
- print("\nSample query - Sales by region:")
- query = """
- SELECT Region, COUNT(*) as Sales_Count, SUM(Amount) as Total_Sales
- FROM sales
- GROUP BY Region
- ORDER BY Total_Sales DESC
- """
- region_sales = cursor.execute(query).fetchall()
- for region in region_sales:
- print(f" {region[0]}: {region[1]} sales, ${region[2]:.2f} total")
- conn.close()
- if __name__ == "__main__":
- print("ETL PROCESS DEMONSTRATION")
- print("========================\n")
- create_sample_data()
- df = extract()
- transformed_df = transform(df)
- load(transformed_df)
- print("\nETL process completed successfully!")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement