Storage for my Data Engineering learning and reference codebase
We will proceed step by step through the outline, starting from Module 1, treating you as a beginner and building concepts with hands-on code examples. I will pick Python as the language to start since it is beginner-friendly, widely used in data engineering, and covered in your outline.
Let’s begin! 🚀
Since the outline begins with Linux basics, we'll start here.
Goal:
- Learn essential Linux commands to navigate and interact with a Linux environment.
- Install VirtualBox or VMware.
- Download a Ubuntu LTS ISO image (free Linux distribution).
- For Windows users:
- Open PowerShell as Administrator.
- Run the following command:
wsl --install
- Restart your computer and install Ubuntu from the Microsoft Store.
Let’s explore some essential Linux commands:
-
Navigation Commands
pwd # Print the current directory ls # List files in the current directory cd /path/to/dir # Change directory cd ~ # Go to home directory
-
File and Directory Operations
mkdir my_folder # Create a directory touch my_file.txt # Create an empty file cp my_file.txt copy_file.txt # Copy a file mv my_file.txt new_name.txt # Rename or move a file rm new_name.txt # Delete a file rmdir my_folder # Delete a directory (only if empty)
-
File Viewing
cat my_file.txt # View file content less my_file.txt # View content page by page head my_file.txt # Show first 10 lines tail my_file.txt # Show last 10 lines
- Create a directory named
data_engineering_basics. - Inside that directory:
- Create a file named
linux_notes.txt. - Write "Learning Linux Basics" into the file.
- Create a file named
- Copy the file to a new file called
backup_linux_notes.txt. - List the files and directories.
Commands to Use:
mkdir data_engineering_basics
cd data_engineering_basics
touch linux_notes.txt
echo "Learning Linux Basics" > linux_notes.txt
cp linux_notes.txt backup_linux_notes.txt
lsHere’s an example to manipulate a dataset in Linux:
Task: Create and Process a Simple CSV File
-
Create a CSV file using a text editor or
echocommand.echo "Name,Age,Role" > team_data.csv echo "Alice,30,Engineer" >> team_data.csv echo "Bob,25,Data Analyst" >> team_data.csv echo "Charlie,28,Manager" >> team_data.csv
-
Display and confirm the file content.
cat team_data.csv
-
Use
awkto filter data where the role is "Engineer".awk -F',' '$3=="Engineer" {print $1, $2, $3}' team_data.csv
Here are additional Linux commands and tasks tailored for data engineering practice, focusing on data manipulation and file handling in Linux environments.
-
grep: Search for specific text in a file.grep "Engineer" team_data.csv # Find all lines with "Engineer" grep -i "manager" team_data.csv # Case-insensitive search
-
wc: Count lines, words, or characters in a file.wc team_data.csv # Count lines, words, and characters wc -l team_data.csv # Count only lines wc -w team_data.csv # Count only words
-
cut: Extract specific columns from a file.cut -d',' -f1 team_data.csv # Extract only the "Name" column cut -d',' -f2 team_data.csv # Extract the "Age" column
-
sort: Sort the content of a file.sort team_data.csv # Sort alphabetically sort -t',' -k2 -n team_data.csv # Sort by Age (column 2) numerically
-
uniq: Remove duplicate lines (file must be sorted first).sort team_data.csv | uniq # Remove duplicate rows
-
Split a large file into smaller chunks:
split -l 10 team_data.csv part_ # Split into chunks of 10 lines each -
Merge multiple files into one:
cat part_* > merged_data.csv
-
Find the number of rows in a large dataset:
wc -l large_dataset.csv
-
Preview the start or end of a large file:
head -n 5 large_dataset.csv # View the first 5 lines tail -n 5 large_dataset.csv # View the last 5 lines
-
Compress a file using
gzip:gzip team_data.csv # Compress the file ls # Verify the .gz file is created
-
Decompress a file:
gunzip team_data.csv.gz
-
View compressed data without extracting:
zcat team_data.csv.gz # View compressed file content -
Combine and compress multiple files:
tar -cvf archive.tar file1.csv file2.csv gzip archive.tar # Compress the archive
-
Change file permissions:
chmod 644 team_data.csv # Owner: read/write, Others: read-only chmod 755 script.sh # Owner: read/write/execute, Others: read/execute
-
View file permissions:
ls -l # List files with permissions -
Change file ownership:
sudo chown username:groupname team_data.csv
-
Check file/directory sizes:
du -sh team_data.csv # Show size of a file du -sh data_engineering_basics # Show size of a directory
-
Check overall disk usage:
df -h # Show available and used disk space
-
Redirect output to a file:
grep "Engineer" team_data.csv > engineers.txt
-
Pipe commands to combine operations:
cat team_data.csv | grep "Engineer" | wc -l # Count rows with "Engineer"
-
Find large files in a directory:
find . -type f -size +10M # Find files larger than 10MB
- Download files from the internet:
wget https://example.com/sample.csv curl -O https://example.com/sample.csv
-
Download a sample dataset using
wgetorcurl.
Example:wget https://people.sc.fsu.edu/~jburkardt/data/csv/airtravel.csv
-
Perform the following operations:
- Count the total number of rows and columns in the dataset.
- Extract only the first column.
- Sort the dataset by the second column numerically.
- Find the number of unique entries in the first column.
- Compress the dataset and verify its size.
-
Count rows and columns:
wc -l airtravel.csv # Count rows head -n 1 airtravel.csv | awk -F',' '{print NF}' # Count columns
-
Extract the first column:
cut -d',' -f1 airtravel.csv > first_column.txt
-
Sort by the second column:
sort -t',' -k2 -n airtravel.csv > sorted_data.csv
-
Find unique entries:
cut -d',' -f1 airtravel.csv | sort | uniq
-
Compress the file:
gzip airtravel.csv
To download files in macOS, you can use the Terminal with commands like curl or wget. macOS comes with curl pre-installed, while wget can be installed if needed. Below are the step-by-step instructions:
-
Download a File
Use the-Ooption to save the file with its original name:curl -O https://example.com/sample.csv
Replace
https://example.com/sample.csvwith the actual URL of the file. -
Save File with a Custom Name
Use the-ooption to specify a custom filename:curl -o myfile.csv https://example.com/sample.csv
-
Download Multiple Files
Usecurlmultiple times to download multiple files:curl -O https://example.com/file1.csv curl -O https://example.com/file2.csv
-
Resume a Download
If the file download was interrupted, you can resume it:curl -C - -O https://example.com/largefile.zip
wget is not pre-installed on macOS but can be installed using Homebrew.
-
Install Homebrew (if not already installed):
Open Terminal and run:/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)" -
Install
wget:brew install wget
-
Download a File:
wget https://example.com/sample.csv
-
Download and Rename the File:
wget -O myfile.csv https://example.com/sample.csv
-
Download an Entire Directory (recursive):
wget -r -np https://example.com/directory/
- Open Safari, Chrome, or any browser.
- Enter the file URL directly in the address bar.
- Press Enter.
- The browser will automatically download the file and save it to your Downloads folder.
-
Open Terminal.
-
Download a sample file using
curl:curl -O https://people.sc.fsu.edu/~jburkardt/data/csv/airtravel.csv
-
Confirm the file is downloaded by listing the files in the directory:
ls
-
Open the downloaded file using a text editor or command:
cat airtravel.csv
Practice these commands until you’re comfortable.
Once we feel comfortable with basic Linux commands, we will move to the next part of Module 1: Setting Up Python Environment and Basics of Python Programming.
Let’s move to the next part of Module 1: Setting up the Python Environment and Learning Python Basics.
Python is widely used for data engineering tasks like data cleaning, transformation, and building pipelines. Let’s start with setting up the Python environment.
-
Check if Python is already installed:
python3 --version
-
If not installed:
-
macOS/Linux: Python 3 is pre-installed. Install or upgrade via
brew:brew install python3
-
Windows:
- Download Python from python.org.
- Check the option “Add Python to PATH” during installation.
-
-
Confirm installation:
python3 --version pip3 --version
A virtual environment keeps your Python libraries isolated, which is good practice.
-
Install
venvif not already present:pip3 install virtualenv
-
Create a virtual environment:
python3 -m venv my_env
-
Activate the environment:
- On macOS/Linux:
source my_env/bin/activate - On Windows:
.\my_env\Scripts\activate
- On macOS/Linux:
-
Deactivate the virtual environment:
deactivate
Install libraries needed for data engineering:
pip install pandas numpy matplotlibCheck installed packages:
pip listLet’s start with basic Python concepts step-by-step.
Create a file basics.py and write:
# Variables and Data Types
name = "Alice"
age = 30
is_engineer = True
salary = 75000.50
# Print variables
print("Name:", name)
print("Age:", age)
print("Is Engineer:", is_engineer)
print("Salary:", salary)
# Data type checking
print(type(name))
print(type(age))
print(type(is_engineer))
print(type(salary))Run the code:
python3 basics.py# List and Loops
team = ["Alice", "Bob", "Charlie"]
# Loop through a list
for member in team:
print("Team Member:", member)
# Add and remove items
team.append("David")
print("After Adding:", team)
team.remove("Bob")
print("After Removing:", team)Output:
Team Member: Alice
Team Member: Bob
Team Member: Charlie
After Adding: ['Alice', 'Bob', 'Charlie', 'David']
After Removing: ['Alice', 'Charlie', 'David']
Dictionaries are key-value pairs, widely used in data processing.
# Dictionary Example
employee = {
"Name": "Alice",
"Age": 30,
"Role": "Engineer",
"Salary": 75000
}
# Access values
print("Employee Name:", employee["Name"])
# Add new key-value pair
employee["Department"] = "Data Engineering"
print("Updated Dictionary:", employee)
# Loop through keys and values
for key, value in employee.items():
print(key, ":", value)- Create a Python script named
python_basics.py. - Define:
- Variables: a string, an integer, a float, and a boolean.
- A list of numbers and find their sum using a loop.
- A dictionary to store personal details like name, age, and role.
- Print the values and use a loop to print the dictionary.
Once this is done, we’ll move on to functions, exception handling, and working with files in Python, which will prepare us for real-world data engineering tasks.
Keep practicing! 🚀
Here’s a complete solution to the practice task:
# Variables
name = "John Doe"
age = 25
height = 5.9 # in feet
is_student = True
print("Name:", name)
print("Age:", age)
print("Height:", height)
print("Is Student:", is_student)
# List of numbers and finding their sum
numbers = [10, 20, 30, 40, 50]
total = 0
for number in numbers:
total += number
print("Sum of Numbers:", total)
# Dictionary to store personal details
personal_details = {
"Name": "John Doe",
"Age": 25,
"Role": "Data Engineering Learner"
}
# Print values from the dictionary
print("Personal Details:")
for key, value in personal_details.items():
print(f"{key}: {value}")Output:
Name: John Doe
Age: 25
Height: 5.9
Is Student: True
Sum of Numbers: 150
Personal Details:
Name: John Doe
Age: 25
Role: Data Engineering Learner
Functions allow you to organize code into reusable blocks.
# Function to calculate the sum of a list of numbers
def calculate_sum(numbers):
total = 0
for number in numbers:
total += number
return total
# Using the function
numbers = [10, 20, 30, 40, 50]
print("Sum of Numbers:", calculate_sum(numbers))Helps manage errors gracefully.
# Function to divide two numbers with exception handling
def divide_numbers(a, b):
try:
result = a / b
except ZeroDivisionError:
return "Error: Division by zero is not allowed!"
except TypeError:
return "Error: Please provide numbers!"
else:
return result
print(divide_numbers(10, 2)) # Valid division
print(divide_numbers(10, 0)) # Division by zero
print(divide_numbers(10, "a")) # Invalid typeOutput:
5.0
Error: Division by zero is not allowed!
Error: Please provide numbers!
Data engineers work heavily with files. Let's learn to read and write files.
# Write data to a file
with open("data.txt", "w") as file:
file.write("Name,Age,Role\n")
file.write("Alice,30,Engineer\n")
file.write("Bob,25,Data Analyst\n")
# Read data from a file
with open("data.txt", "r") as file:
content = file.readlines()
for line in content:
print(line.strip())Output:
Name,Age,Role
Alice,30,Engineer
Bob,25,Data Analyst
- Write a function
calculate_averageto compute the average of a list of numbers. - Write a program that:
- Takes a list of numbers from the user.
- Handles invalid input (e.g., entering a string instead of a number).
- Saves the valid numbers to a file named
numbers.txt. - Reads the numbers back from the file and calculates their average.
Here’s a complete solution for the task:
# Function to calculate the average of a list of numbers
def calculate_average(numbers):
if len(numbers) == 0:
return 0
return sum(numbers) / len(numbers)
# Function to get user input and validate it
def get_numbers_from_user():
numbers = []
while True:
user_input = input("Enter a number (or type 'done' to finish): ")
if user_input.lower() == "done":
break
try:
number = float(user_input) # Convert input to a float
numbers.append(number)
except ValueError:
print("Invalid input! Please enter a valid number.")
return numbers
# Function to write numbers to a file
def write_numbers_to_file(numbers, filename):
with open(filename, "w") as file:
for number in numbers:
file.write(f"{number}\n")
# Function to read numbers from a file
def read_numbers_from_file(filename):
numbers = []
with open(filename, "r") as file:
for line in file:
numbers.append(float(line.strip()))
return numbers
# Main Program
filename = "numbers.txt"
# Step 1: Get numbers from user
print("Enter numbers to calculate their average. Type 'done' when finished.")
numbers = get_numbers_from_user()
# Step 2: Write numbers to a file
write_numbers_to_file(numbers, filename)
print(f"Numbers saved to {filename}.")
# Step 3: Read numbers from the file
read_numbers = read_numbers_from_file(filename)
print(f"Numbers read from file: {read_numbers}")
# Step 4: Calculate and display the average
average = calculate_average(read_numbers)
print(f"The average of the numbers is: {average}")Input/Output:
Enter numbers to calculate their average. Type 'done' when finished.
Enter a number (or type 'done' to finish): 10
Enter a number (or type 'done' to finish): 20
Enter a number (or type 'done' to finish): abc
Invalid input! Please enter a valid number.
Enter a number (or type 'done' to finish): 30
Enter a number (or type 'done' to finish): done
Numbers saved to numbers.txt.
Numbers read from file: [10.0, 20.0, 30.0]
The average of the numbers is: 20.0
-
calculate_average(numbers)Function:- Computes the average of a list of numbers using
sum(numbers) / len(numbers).
- Computes the average of a list of numbers using
-
User Input Validation:
- Accepts input in a loop until the user types "done".
- Converts valid inputs to floats and appends them to the list.
- Catches invalid inputs using
try-except.
-
File Writing and Reading:
- Writes each number to a new line in the file
numbers.txt. - Reads numbers back from the file and converts them to floats.
- Writes each number to a new line in the file
Here are key and tricky points that every Python and data engineering learner should keep in mind as they progress, along with examples to illustrate them.
- Lists, dictionaries, and sets are mutable, meaning changes to one reference affect all references.
# Mutable Example
list_a = [1, 2, 3]
list_b = list_a # Both point to the same object
list_b.append(4)
print("list_a:", list_a) # list_a is also modified
print("list_b:", list_b)Output:
list_a: [1, 2, 3, 4]
list_b: [1, 2, 3, 4]
Solution: Create a Copy Instead
list_a = [1, 2, 3]
list_b = list_a.copy() # Create a shallow copy
list_b.append(4)
print("list_a:", list_a) # Original remains unchanged
print("list_b:", list_b)Processing large files efficiently is crucial in data engineering.
# Inefficient: Loads the entire file into memory
with open("large_file.txt", "r") as file:
lines = file.readlines()
# Efficient: Reads one line at a time
with open("large_file.txt", "r") as file:
for line in file:
print(line.strip()) # Process each lineUse os or pathlib for portability.
import os
# Portable path handling
base_dir = os.path.dirname(os.path.abspath(__file__)) # Get current directory
file_path = os.path.join(base_dir, "data", "dataset.csv")
print("File Path:", file_path)Logging is more robust and can be configured for different levels (e.g., DEBUG, INFO, ERROR).
import logging
# Configure logging
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
logging.info("Processing data...")
logging.warning("Missing value encountered!")
logging.error("File not found!")Output:
INFO: Processing data...
WARNING: Missing value encountered!
ERROR: File not found!
When working with data, missing or invalid values are common.
import pandas as pd
# Sample data with missing values
data = {"Name": ["Alice", "Bob", None], "Age": [30, None, 25]}
df = pd.DataFrame(data)
# Fill missing values
df["Name"].fillna("Unknown", inplace=True)
df["Age"].fillna(df["Age"].mean(), inplace=True)
print(df)Output:
Name Age
0 Alice 30.0
1 Bob 27.5
2 Unknown 25.0
Use chunking to process large datasets efficiently.
import pandas as pd
# Process large CSV file in chunks
chunk_size = 1000
for chunk in pd.read_csv("large_dataset.csv", chunksize=chunk_size):
print("Processing chunk...")
print(chunk.head())Don’t reinvent the wheel—use libraries like Pandas, NumPy, PySpark, etc., for complex tasks.
import pandas as pd
# Sample DataFrame
data = {"Name": ["Alice", "Bob", "Charlie"], "Age": [30, 25, 35]}
df = pd.DataFrame(data)
# Add a new column
df["Age in 5 Years"] = df["Age"] + 5
# Filter rows
filtered_df = df[df["Age"] > 30]
print(filtered_df)Use Python’s debugger to troubleshoot.
# Debugging Example
import pdb
def divide(a, b):
pdb.set_trace() # Start the debugger
return a / b
result = divide(10, 0) # Will raise ZeroDivisionErrorFor data engineers transitioning from SQL to Python, Pandas allows SQL-like operations.
import pandas as pd
# Sample DataFrame
data = {"Name": ["Alice", "Bob", "Charlie"], "Salary": [70000, 50000, 60000]}
df = pd.DataFrame(data)
# Query with a SQL-like filter
high_salary = df.query("Salary > 55000")
print(high_salary)-
List Comprehension:
# Inefficient Loop squares = [] for x in range(10): squares.append(x**2) # Efficient List Comprehension squares = [x**2 for x in range(10)]
-
Use Enumerate Instead of Range:
items = ["apple", "banana", "cherry"] for index, item in enumerate(items): print(f"{index}: {item}")
When integrating Python with databases, optimize SQL queries for large datasets.
import sqlite3
# Use LIMIT for large datasets
query = "SELECT Name, Age FROM employees WHERE Age > 30 LIMIT 1000"
connection = sqlite3.connect("database.db")
df = pd.read_sql_query(query, connection)
print(df)Avoid hardcoding parameters in scripts. Use .env or config files.
from dotenv import load_dotenv
import os
# Load .env file
load_dotenv()
# Read environment variable
db_url = os.getenv("DATABASE_URL")
print("Database URL:", db_url)- Keep Code Modular: Break tasks into reusable functions.
- Use Generators: For large datasets to save memory.
- Automate Testing: Use
pytestorunittestto ensure code reliability. - Secure Sensitive Data: Use
.envfiles for credentials. - Understand the Ecosystem: Familiarize yourself with libraries like Pandas, NumPy, PySpark, and tools like Hadoop and Kafka.
Once you’ve understood and practiced this, we’ll move to the Big Data section, starting with Hadoop Overview and History, as outlined. Let me know when you’re ready! 🚀
Once you're done with the task or need help solving it! After this, we will move to study cases for Big Data from the outline. 🚀
Hadoop is a powerful framework for distributed storage and processing of big data. This module focuses on HDFS, YARN, MapReduce, and integrations like Sqoop, Flume, and Hive, covering both theoretical and practical aspects.
HDFS is a distributed storage system that splits files into blocks and distributes them across a cluster. It ensures fault tolerance by replicating blocks across multiple nodes.
- NameNode: Stores metadata and manages the directory structure.
- DataNode: Stores the actual file blocks.
- Secondary NameNode: Merges logs to ensure NameNode efficiency.
- Write Operation: The client writes a file, which is split into blocks and replicated across DataNodes.
- Read Operation: The client queries the NameNode for metadata and retrieves file blocks from DataNodes.
- Prerequisites:
- Install Java:
sudo apt install openjdk-11-jdk
- Download Hadoop:
wget https://downloads.apache.org/hadoop/common/hadoop-3.3.5/hadoop-3.3.5.tar.gz tar -xzvf hadoop-3.3.5.tar.gz
- Install Java:
- Configuration:
- Edit
core-site.xmlto set the default file system. - Edit
hdfs-site.xmlto configure replication and storage paths.
- Edit
start-dfs.sh- Create a Directory:
hdfs dfs -mkdir /user/data
- Upload a File:
hdfs dfs -put localfile.txt /user/data
- List Files:
hdfs dfs -ls /user/data
- Read a File:
hdfs dfs -cat /user/data/localfile.txt
- Delete a File:
hdfs dfs -rm /user/data/localfile.txt
Tip: Use -du -h to monitor HDFS disk usage efficiently.
YARN (Yet Another Resource Negotiator) manages cluster resources and job scheduling. It separates resource management and job execution into:
- ResourceManager: Allocates cluster resources.
- NodeManager: Monitors node-level resources.
- Map Phase: Splits input data into key-value pairs.
- Shuffle & Sort Phase: Groups data by key.
- Reduce Phase: Processes grouped data.
#!/usr/bin/env python3
import sys
for line in sys.stdin:
words = line.strip().split()
for word in words:
print(f"{word}\t1")#!/usr/bin/env python3
import sys
from collections import defaultdict
word_counts = defaultdict(int)
for line in sys.stdin:
word, count = line.strip().split("\t")
word_counts[word] += int(count)
for word, count in word_counts.items():
print(f"{word}\t{count}")- Upload input data to HDFS:
hdfs dfs -put input.txt /user/data/input
- Execute MapReduce:
hadoop jar /path/to/hadoop-streaming.jar \ -input /user/data/input \ -output /user/data/output \ -mapper mapper.py \ -reducer reducer.py
- View the Output:
hdfs dfs -cat /user/data/output/part-00000
Tips:
- Use
Combinerfor local aggregation to improve performance. - Use
Partitionerfor custom key grouping.
- Install MySQL:
sudo apt install mysql-server
- Create a Database and Table:
CREATE DATABASE sales; USE sales; CREATE TABLE orders (id INT, product_name VARCHAR(255), quantity INT);
- Insert Sample Data:
INSERT INTO orders VALUES (1, 'Laptop', 5), (2, 'Mouse', 10);
- Import Data from MySQL to HDFS:
sqoop import --connect jdbc:mysql://localhost/sales \ --username root --password password \ --table orders --target-dir /user/data/orders
- Export Data from HDFS to MySQL:
sqoop export --connect jdbc:mysql://localhost/sales \ --username root --password password \ --table orders --export-dir /user/data/orders
Flume is used to ingest logs and streaming data into Hadoop.
- Install Flume:
sudo apt install flume-ng
- Configure
flume.conf:agent1.sources = source1 agent1.sinks = sink1 agent1.channels = channel1 agent1.sources.source1.type = exec agent1.sources.source1.command = tail -f /var/log/syslog agent1.sinks.sink1.type = hdfs agent1.sinks.sink1.hdfs.path = hdfs://localhost:9000/user/flume/logs
- Start Flume:
flume-ng agent --conf /path/to/conf --name agent1 -Dflume.root.logger=INFO,console
Hive is a data warehouse infrastructure built on Hadoop for querying and analyzing data using SQL-like syntax.
- Partitioning: Organizes data into sub-directories.
- Bucketing: Divides data into fixed-size chunks.
- Create Hive Table:
CREATE TABLE games (name STRING, rating INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE;
- Load Data:
LOAD DATA INPATH '/user/data/games.csv' INTO TABLE games; - Query Data:
SELECT name, MAX(rating) AS top_rating FROM games GROUP BY name;
This article covers the theoretical and practical aspects of Module 2 with real-world, reusable examples.
This project demonstrates a real-world e-commerce analytics pipeline built using the Hadoop ecosystem. The pipeline ingests raw user activity logs, processes the data using MapReduce, analyzes it with Hive, and exports aggregated insights to MySQL for reporting.
ecommerce-analytics/
│
├── logs/ # Raw activity logs
├── data/ # HDFS staging area
├── mapper.py # MapReduce mapper script
├── reducer.py # MapReduce reducer script
├── flume/ # Flume configuration files
│ └── flume.conf
├── hive/ # Hive scripts
│ ├── create_table.hql
│ └── analytics_queries.hql
├── sqoop/ # Sqoop commands
│ ├── sqoop_import.sh
│ └── sqoop_export.sh
└── README.md # Documentation
Create a sample log file logs/activity.log:
mkdir -p logs
cat <<EOL > logs/activity.log
timestamp,userid,action,product
2024-12-01T12:00:00,101,search,laptop
2024-12-01T12:05:00,102,click,smartphone
2024-12-01T12:10:00,101,search,smartphone
EOLCreate a flume/flume.conf file:
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1
agent1.sources.source1.type = exec
agent1.sources.source1.command = tail -f logs/activity.log
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path = hdfs://localhost:9000/user/ecommerce/logs
agent1.channels.channel1.type = memory
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1Start Flume:
flume-ng agent --conf flume/ --name agent1 -Dflume.root.logger=INFO,consoleSave this as mapper.py:
#!/usr/bin/env python3
import sys
for line in sys.stdin:
data = line.strip().split(",")
if len(data) == 4 and data[2] == "search":
print(f"{data[3]}\t1") # Emit product and countSave this as reducer.py:
#!/usr/bin/env python3
import sys
from collections import defaultdict
word_counts = defaultdict(int)
for line in sys.stdin:
word, count = line.strip().split("\t")
word_counts[word] += int(count)
for word, count in word_counts.items():
print(f"{word}\t{count}")- Upload logs to HDFS:
hdfs dfs -mkdir -p /user/ecommerce/logs hdfs dfs -put logs/activity.log /user/ecommerce/logs
- Execute MapReduce:
hadoop jar /path/to/hadoop-streaming.jar \ -input /user/ecommerce/logs \ -output /user/ecommerce/output \ -mapper mapper.py \ -reducer reducer.py
- View Results:
hdfs dfs -cat /user/ecommerce/output/part-00000
Create hive/create_table.hql:
CREATE EXTERNAL TABLE IF NOT EXISTS ecommerce_logs (
timestamp STRING,
userid STRING,
action STRING,
product STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION '/user/ecommerce/logs';Execute:
hive -f hive/create_table.hqlCreate hive/analytics_queries.hql:
SELECT product, COUNT(*) AS search_count
FROM ecommerce_logs
WHERE action = 'search'
GROUP BY product
ORDER BY search_count DESC;Execute:
hive -f hive/analytics_queries.hqlCREATE DATABASE ecommerce;
USE ecommerce;
CREATE TABLE product_search_counts (
product VARCHAR(255),
search_count INT
);Create sqoop/sqoop_export.sh:
sqoop export \
--connect jdbc:mysql://localhost/ecommerce \
--username root --password password \
--table product_search_counts \
--export-dir /user/ecommerce/output \
--fields-terminated-by '\t'Execute:
sh sqoop/sqoop_export.sh- Log Monitoring: Use
tailandhdfs dfs -tailto debug Flume and HDFS logs. - MapReduce Optimization:
- Use a Combiner to perform local aggregation during the Map phase.
- Implement custom Partitioner for better load balancing in Reducers.
- Hive Optimization:
- Partition tables by date for faster queries.
- Use ORC file format for efficient storage and query performance.
- Pipeline Automation:
- Schedule Flume, MapReduce, and Sqoop jobs using Apache Airflow or Oozie.
This project demonstrates a simplified yet effective end-to-end big data pipeline. You can expand it to include real-time processing with Apache Kafka or Spark Streaming for advanced use cases. Let me know if you’d like to explore further enhancements!