Data Engineering Interview Questions and Answers (2025 Guide)

Data Engineering Interview Questions and Answers (2025 Guide) was originally published on Exponent.

Data Engineering Interview Questions and Answers (2025 Guide)

These are some of the most common data engineering interview questions and answers.

Verified: These are real questions, reported and verified by hiring managers and candidates across 100+ interviews.

Our expert contributors:

ETL Pipeline Questions

Data engineers design and implement ETL pipelines to move data between target data stores.

You’ll need a solid grasp of data transformation, orchestration, and error handling.

Data Engineering Interview Questions and Answers (2025 Guide)A rubric for pipeline design interviews.

ETL Incremental Update

How would you implement an incremental update mechanism in a daily ETL pipeline?

  • This PySpark code performs an incremental ETL job by loading historical data from a Parquet file and new data from a CSV file. 
  • It filters the new data to include only records with an update_time greater than the maximum update_time in the historical dataset, ensuring only new or updated records are processed. 
  • The filtered data is then appended to the existing historical data in Parquet format.

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName(‘etl_job’).getOrCreate()

# Load historical data
historical_df = spark.read.parquet(“/path/to/historical_data”)

# Load new records
new_data_df = spark.read.csv(“/path/to/daily_data.csv”, header=True)

# Assuming each record has a unique id field ‘record_id’,
# and ‘update_time’ field to track modifications.
# Define incremental load by filtering only new or updated records

latest_df = new_data_df.filter(new_data_df.update_time > historical_df.agg({“update_time”: “max”}).first()[0])

# Write to target, either append or insert into partition
latest_df.write.mode(“append”).parquet(“/path/to/historical_data”)

🧠

Remember to ask clarifying questions about any pipeline design.

Clarifying questions
“What are the primary business objectives for this ETL pipeline?”
– “What’s the expected data volume and frequency of updates?”
– “Are there any specific latency requirements for data availability?”

Handling Schema Evolution 

How would you handle schema evolution in an ETL pipeline that extracts data from constantly changing APIs?

Question
Options
Recommended Tools/Approach

Is the data schema evolving?
Yes
Use schema-on-read (e.g., S3, Delta Lake)

No
Use a simpler approach with a known schema

Does the extraction need to handle schema drift?
Yes
Implement schema validation mechanisms at query time

No
Proceed with static schema extraction

Is event-driven extraction needed?
Yes
Trigger event-driven extraction based on incoming data

No
Use scheduled extraction for predictable data generation

This PySpark code handles schema evolution when new data contains additional or missing columns.

  • It reads a JSON file into a DataFrame using spark.read.json.
  • A new column, new_column, is added to the DataFrame with default None values to account for any missing fields in the new data.
  • The write operation uses the mergeSchema option, which allows Spark to automatically handle schema evolution when writing to a Parquet file, merging the new schema with the existing one at the target path.

# Example of schema evolution handling using PySpark
from pyspark.sql import functions as F
dataframe = spark.read.json(“/path/to/new_data.json”) # Adding default placeholders for missing columns
default_df = dataframe.withColumn(“new_column”, F.lit(None))

# You can leverage Spark’s ‘mergeSchema’ option when writing to handle schema evolution automatically
default_df.write.option(“mergeSchema”, “true”).parquet(“/path/to/target_data”)

Custom Transformations

Write a custom transformation function to clean data using Python that eliminates null or inconsistent records.

🧠

Things to consider when designing the transformation stage:

What transformations are required? (Cleaning, joining, aggregating, deriving new metrics)

What is the data volume? (Is it large enough to require distributed processing or in-memory optimization?)

What is the complexity of the transformations? (Are you performing simple transformations, complex joins, aggregations, or machine learning tasks?)

How frequently do the transformations need to run? (Real-time, batch, or on-demand)

What are the performance and latency requirements? (Low-latency processing for real-time applications or tolerance for batch processing?)

What are the scalability requirements? (How will the system handle data growth in the long term?)

How to handle failures? (How will the system handle failures and maintain data quality?)

This function cleans a DataFrame by handling missing values and date parsing:

  • It removes rows where the user_id or purchase_date fields are null to ensure critical fields are populated.
  • It converts the purchase_date column to a datetime format, coercing any invalid dates to NaT (Not a Time).
  • It drops rows where the date parsing failed and returns the cleaned DataFrame.

def clean_data(df):
df_cleaned = df.dropna(subset=[“user_id”, “purchase_date”])
df_cleaned[“purchase_date”] = pd.to_datetime(
df_cleaned[“purchase_date”], errors=”coerce”
)
df_cleaned.dropna(subset=[“purchase_date”], inplace=True)
return df_cleaned

clean_data(input_dataframe)

ETL Data Deduplication

How would you implement a data deduplication mechanism in an ETL job that handles real-time streaming records?

The PySpark code below processes a streaming DataFrame and handles the deduplication of records using watermarks:

  • The .withWatermark(“event_timestamp”, “10 minutes”) sets a watermark on the event_timestamp column, allowing late data up to 10 minutes to be processed. After this window, older data is discarded.
  • The .dropDuplicates([“record_id”]) removes duplicate records based on the record_id field, ensuring only unique records are written to the output.
  • The .writeStream.format(“parquet”) writes the deduplicated stream in Parquet format to the specified output path (/path/to/output) as a continuous streaming job.

# Assuming Kafka stream produces records with a unique UUID identifier
deduplicated_stream = incoming_stream

.withWatermark(“event_timestamp”, “10 minutes”)
.dropDuplicates([“record_id”])
deduplicated_stream.writeStream
.format(“parquet”)
.option(“path”, “/path/to/output”)
.start()

Efficient Backfilling of Missing Data

Explain an approach for efficient backfilling of missing data in a pipeline.

  • Efficient backfilling of missing data begins with identifying the gaps, often through metadata or by querying key fields.
  • Partition the missing data by logical divisions, such as time or region, and process it in parallel to minimize system strain. 
  • Start by prioritizing the most recent missing data and incrementally backfill older gaps. 
  • Use watermarks or checkpoints to track progress, preventing endless reprocessing of outdated data.
  • Ensure the writes are idempotent by using upserts or deduplication to avoid duplicating records. 
  • Monitor the progress and validate the backfilled data to ensure accuracy and completeness.
  • Control the backfilling rate to prevent overloading the pipeline and leverage caching or intermediate storage to optimize processing.

🧠

Remember to ask clarifying questions about any pipeline design.

Clarifying questions
“What are the primary business objectives for this ETL pipeline?”
– “What’s the expected data volume and frequency of updates?”
– “Are there any specific latency requirements for data availability?”

How do you handle nulls in Spark?

The various types of nulls in Spark are:

  1. Filtering null values
  2. Replacing null values
  3. Dropping rows with null values 
  4. Coalesce
  • To filter rows based on null values in a specific column (or columns), use the .filter() or .where() methods.
  • For example, the code below filters out rows with nulls in the name column, showing only rows where name is not null.

# Create a sample DataFrame with null values
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName(“NullHandling”).getOrCreate()
data = [(1, “Alice”), (2, None), (3, “Bob”), (None, “Eve”)]
df = spark.createDataFrame(data, [“id”, “name”])

# Filter rows where the ‘name’ column is NOT null
df_filtered = df.filter(col(“name”).isNotNull())
df_filtered.show()

  • To replace null values, use the .fillna() method or .na.fill() with either a dictionary for specific columns or a scalar value for all columns.
  • In the example below, null values in name are replaced with “Unknown,” and nulls in id are replaced with -1. You can replace nulls in all columns with a single value if desired.

# Replace null values in ‘name’ column with “Unknown”
df_replaced = df.fillna({“name”: “Unknown”, “id”: -1})
df_replaced.show()

  • To drop rows containing null values, use the .dropna() method. You can control the behavior using parameters such as how and thresh.

In the example below:

  • how=”any” removes rows with any null values.
  • how=”all” removes rows only if all columns have null values.
  • thresh specifies a minimum number of non-null values required to keep a row.

# Drop rows with any null values
df_dropped_any = df.dropna()
df_dropped_any.show()

# Drop rows if all values in the row are null
df_dropped_all = df.dropna(how=”all”)
df_dropped_all.show()

# Drop rows with less than 1 non-null value (thresh=1 means at least 1 non-null value must be present)
df_dropped_thresh = df.dropna(thresh=1)
df_dropped_thresh.show()

  • The .coalesce() function in Spark is used to return the first non-null value among columns, which is useful for substituting alternative values when encountering nulls.
  • coalesce returns the first non-null value among name, gender, and id for each row. If name is null, it will take the value from gender or id, in that order. This is particularly useful when multiple columns have potential nulls, and a default fallback is needed.

from pyspark.sql.functions import coalesce

# Create a sample DataFrame with multiple columns, some containing nulls
data = [(1, None, “Alice”), (2, “M”, None), (3, None, “Bob”)]
df_multi = spark.createDataFrame(data, [“id”, “gender”, “name”])

# Use coalesce to select the first non-null value in the specified columns
df_coalesced = df_multi.withColumn(“final_name”, coalesce(“name”, “gender”, “id”))
df_coalesced.show()

Coding Questions

Data engineers must write efficient, clean code that manipulates data at scale.

🧠

Read more: Tips for Acing Coding Interviews

Merge Sort Doubly Linked List

You are given the head of a doubly linked list.

Using merge sort, write a function to sort the linked list in ascending or descending order.

📹

Watch a software engineer at TikTok answer the question, “Merge sort a doubly linked list.”

Next, imagine your program is running slowly because it’s repeatedly accessing data from the disk.

To improve performance, you want to build a simple key-value store to cache this data in memory and limit the memory used.

You decide to build a caching system that only keeps the N most recently used items—also known as a least recently used (LRU) cache. Write a class LRUCache(n) that accepts a size limit n.

It should support a set(key, value) method for inserting or updating items and a get(key) method for retrieving items. Can you implement a solution where both methods run in O(1) time?

  • Time Complexity: The merge sort algorithm processes each list element n times. Thus, the time complexity is O(n log n), where n is the number of nodes in the list.
  • Space Complexity: The algorithm sorts the list in place and uses constant extra space. Thus, the space complexity is O(1).

Find Largest Numbers

Let’s say we have a long list of unsorted numbers (potentially millions), and we want to find the M largest numbers contained in it. Implement a function find_largest(input, m) to find and return the largest m values given an input array or file.

Return None or null if the input array is empty.

  • min(largest_values) finds the smallest element in largest_values.
  • largest_values.index(min_val) gets the index of this smallest element so it can be replaced with a new larger element.
  • The final sorted (largest_values, reverse=True) call is optional, depending on whether you want the results sorted in descending order.

def find_largest(input_list, m):
# Check for edge cases
if not input_list or m min_val:
# Replace the smallest element if current num is larger
min_index = largest_values.index(min_val)
largest_values[min_index] = num

# Optional: Sort in descending order
return sorted(largest_values, reverse=True)

# Example usage:
input_list = [3, 1, 5, 6, 8, 2, 9, 10, 7]
m = 3
print(find_largest(input_list, m)) # Output should be [10, 9, 8]

Sudoku Board Solver

Write a function sudokuSolve that checks whether a given sudoku board is solvable. If so, the function returns true. If there is no valid solution to the given sudoku board, it returns false.

  • The get_candidates function generates a list of valid numbers (‘1’ to ‘9’) that can be placed in the given cell (row, col) without causing conflicts in the row, column, or 3×3 sub-grid.
  • The sudoku_solve function attempts to solve the puzzle by identifying the first empty cell (denoted by ‘.’) with the fewest possible candidates. It then tries each candidate recursively, backtracking if a candidate leads to an invalid state.
  • If the board is fully solved (no empty cells left), the function returns True. Otherwise, it backtracks and tries different values until a solution is found or all possibilities are exhausted.

def get_candidates(board, row, col):
candidates = []

for chr in ‘123456789’:
collision = False
for i in range(9):
if (board[row][i] == chr or
board[i][col] == chr or
board[(row – row % 3) + i // 3][(col – col % 3) + i % 3] == chr):
collision = True
break

if not collision:
candidates.append(chr)

return candidates

def sudoku_solve(board):
row, col, candidates = -1, -1, None

for r in range(9):
for c in range(9):
if board[r][c] == ‘.’:
new_candidates = get_candidates(board, r, c)
if candidates is None or len(new_candidates) Coin Change

You are given an integer array coins representing different coin denominations and an integer amount representing the total amount of money.

Write a function coinChange that returns the fewest number of coins needed to make up that amount. If that amount cannot be made up by any combination of the coins, return -1.

You may assume that you have infinite coins of different kinds.

  • The dp array stores the minimum number of coins needed to make each amount from 0 to amount, with dp[0] = 0 because zero coins are required to make zero amount.
  • For each amount i, it iterates through each coin denomination and checks if that coin can be used (i.e., if i – coin >= 0), updating dp[i] with the minimum coins needed.
  • Finally, if dp[amount] is still infinity, it means it’s impossible to make that amount, and the function returns -1. Otherwise, it returns the minimum number of coins needed.

from typing import List

def coin_change(coins: List[int], amount: int) -> int:
# Initialize DP array with a value greater than the maximum possible number of coins needed
dp = [float(‘inf’)] * (amount + 1)
dp[0] = 0 # Base case: 0 coins needed to make amount 0

# Process each amount from 1 to the given amount
for i in range(1, amount + 1):
for coin in coins:
if i – coin >= 0:
dp[i] = min(dp[i], dp[i – coin] + 1)

# If dp[amount] is still infinity, it means it’s not possible to form the amount
return dp[amount] if dp[amount] != float(‘inf’) else -1

Linked List Cycle

Given the head of a linked list, write a function hasCycle to determine if it has a cycle.

A linked list is said to have a cycle if a node’s next pointer points to a previous node in the list, forming a loop. Return true if there is a cycle; otherwise, return false.

Using the Tortoise and Hare algorithm, the solution below detects whether a linked list contains a cycle. This algorithm uses two slow and fast pointers to traverse the linked list at different speeds.

class ListNode:
def __init__(self, val=0, next=None):
self.val = val
self.next = next

def has_cycle(head: ListNode) -> bool:
slow = head
fast = head

while fast and fast.next:
slow = slow.next # Move slow pointer by 1 step
fast = fast.next.next # Move fast pointer by 2 steps

if slow == fast:
return True # A cycle is detected

return False # No cycle detected

SQL Questions

You’ll be expected to write complex SQL queries that efficiently extract and manipulate large volumes of data.

Top Earning Employee by Department

Given the database with the schema shown below, write a SQL query to fetch the top earning employee by department, ordered by department name.

Schema

employees

Column
Data Type

id
int

first_name
varchar

last_name
varchar

salary
int

department_id
int

projects

Column
Data Type

id
int

title
varchar

start_date
date

end_date
date

budget
int

departments

Column
Data Type

id
int

name
varchar

employees_projects

Column
Data Type

project_id
int

employee_id
int

Query Result Format

department_name
employee_id
first_name
last_name
salary

varchar
int
varchar
varchar
int

Sample answer:

WITH ranked_employees AS (
SELECT
e.id AS employee_id,
e.first_name,
e.last_name,
e.salary,
d.name AS department_name,
ROW_NUMBER() OVER (PARTITION BY e.department_id ORDER BY e.salary DESC) AS rank
FROM
employees e
JOIN
departments d ON e.department_id = d.id
)
SELECT
department_name,
employee_id,
first_name,
last_name,
salary
FROM
ranked_employees
WHERE
rank = 1
ORDER BY
department_name;

  • The Common Table Expression (CTE) ranked_employees ranks employees within each department based on their salary in descending order. The ROW_NUMBER() function is used to assign a rank to each employee within their department.
  • The main query selects the top-ranked employee (rank = 1) from each department, resulting in only the top earner in each department.
  • The employees table is joined with the departments table to get the department names.
  • The result is then ordered by department name.

Group Tweets

Given a tweets table with tweet_id, user_id, msg, and tweet_date, group the users by the number of tweets they posted in 2022 and count the number of users in each group.

  • The tweet_cte counts tweets per user for 2022, resulting in user_id and tweet_bucket(number of tweets per user). The main query groups users by tweet_bucket and counts how many users fall into each tweet_bucket. 

with tweet_cte as(
SELECT user_id,COUNT(*) as tweet_bucket FROM tweets
WHERE EXTRACT(year from tweet_date)=2022
GROUP BY user_id)

SELECT tweet_bucket,COUNT(*) as users_num from tweet_cte
GROUP BY tweet_bucket

Success Rate

Given post and post_user tables, write an SQL query that shows the success rate of post (%) when the user’s previous post had failed.

The user table contains post_id, post_date, user_id, interface and is_successful_post.

The post_user table contains user_id, user_type, and age.

Your output should have: user_id and next_post_sc_rate (success rate of the post when the user’s previous post had failed). Order results by increasing next_post_sc_rate.

  • The post_seq CTE assigns a sequential ID (post_seq_id) to each post per user based on the post_date.
  • The post_pairings CTE identifies pairs of posts where the previous post was unsuccessful. The next_post_id is the sequential ID of the post following the unsuccessful post.
  • The final select joins the post_pairings with the post table to get details about the next posts and computes the success rate of the next posts following an unsuccessful post. 
  • SUM(p2.is_successful_post)*1.0 / COUNT(p2.is_successful_post) calculates the ratio of successful next posts to the total number of next posts.
  • ROUND(…, 2) rounds the success rate to two decimal places.
  • GROUP BY groups by user_id and orders the results by next_post_sc_rate in ascending order.

Sample answer:

WITH post_seq AS (
SELECT
p.user_id,
p.post_id,
ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY post_date) AS post_seq_id,
is_successful_post
FROM post as p
)

, post_pairings AS (
SELECT
ps.user_id,
ps.post_seq_id AS fail_post_id,
ps.post_seq_id + 1 AS next_post_id
FROM post_seq AS ps
WHERE ps.is_successful_post = 0
)

SELECT
pp.user_id,
ROUND(SUM(p2.is_successful_post)*1.0/count(p2.is_successful_post),2) AS next_post_sc_rate
FROM post_pairings AS pp
JOIN post AS p2
ON pp.next_post_id = p2.post_id
GROUP BY 1
ORDER BY next_post_sc_rate ASC;

Top 2 Players

You work for a leading game development company where players can team up and compete. Each player’s performance in different game sessions is recorded as distinct score entries in the database.

You’re provided a players table with player_id, player_name, and team_id columns and a scores table with score_id, player_id, and game_score. Write a SQL query to return the top 2 players from each team based on their single highest score across all sessions.

If multiple players share the same highest score, include all of them, which may result in more than two top players for some teams.

The solution is obtained by finding each player’s highest score across all sessions, ranking players within each team based on their highest score, and selecting the top 2 players from each team. More are chosen in case of ties.

WITH PlayerMaxScores AS (
SELECT
p.team_id,
p.player_name,
MAX(s.game_score) AS max_score
FROM
players p
JOIN
scores s ON p.player_id = s.player_id
GROUP BY
p.team_id, p.player_name
),
RankedPlayers AS (
SELECT
team_id,
player_name,
max_score,
DENSE_RANK() OVER (PARTITION BY team_id ORDER BY max_score DESC) AS rank
FROM
PlayerMaxScores
)
SELECT
team_id,
player_name,
max_score
FROM
RankedPlayers
WHERE
rank

  • The PlayerMaxScores CTE aggregates the maximum score for each player.
  • The DENSE_RANK() window function in the RankedPlayers CTE assigns a rank to each player within their team based on their maximum score. The DENSE_RANK() function ensures that players with the same score get the same rank.
  • The final SELECT picks the top two players from each team.
  • System Design Questions

    Data engineering is about building systems that process massive volumes of data efficiently and reliably.

    System design interviews will test your ability to think holistically about how different components fit together in a scalable architecture.

    Spark Job Aggregation

    Write a Spark job that reads a large Parquet file, performs aggregations, and writes it back as a Parquet file.

    from pyspark.sql import SparkSession
    spark = SparkSession.builder.appName(“AggregationJob”).getOrCreate()
    # Read Parquet File
    input_df = spark.read.parquet(“/s3/path/to/data”)
    # Perform Aggregation
    aggregated_df = input_df.groupBy(“user_id”).agg({“sku_count”: “sum”})
    # Write back to Parquet
    aggregated_df.write.mode(“overwrite”).parquet(“/s3/path/to/output”)

    • A Spark session named AggregationJob is created to facilitate DataFrame operations.
    • The code reads data from a specified S3 path in Parquet format into a DataFrame called input_df.
    • The DataFrame is grouped by the user_id column, and the sku_count values are summed up for each user, resulting in a new DataFrame called aggregated_df.
    • The aggregated DataFrame is written back to a specified S3 output path in Parquet format using the overwrite mode to replace any existing data at that location.

    Kafka Consumer 

    Write a Kafka consumer using Python to read messages of user activity and process them.

    from kafka import KafkaConsumer
    import json

    # Create Kafka consumer
    consumer = KafkaConsumer(
    ‘user_activity’,
    bootstrap_servers=[‘localhost:9092′],
    auto_offset_reset=’earliest’,
    enable_auto_commit=True,
    group_id=’user_activity_group’,
    value_deserializer=lambda x: json.loads(x.decode(‘utf-8’))
    )

    for message in consumer:
    user_activity = message.value
    # Perform processing (e.g., store to database, analytics)
    print(user_activity)

    This Python code snippet uses the KafkaConsumer class from the kafka-python library to consume messages from a Kafka topic. Here’s a breakdown of the code:

    • This Python code snippet uses the KafkaConsumer class from the kafka-python library to consume messages from a Kafka topic.
    • A KafkaConsumer object is created to listen to the user_activity topic on a Kafka broker running at localhost:9092. The auto_offset_reset=’earliest’ parameter ensures that the consumer starts reading from the earliest available message if no previous offsets are committed.
    • The enable_auto_commit=True setting allows the consumer to automatically commit the offsets of the messages it has processed. The group_id=’user_activity_group’ specifies the consumer group to which this consumer belongs, allowing for load balancing among multiple consumers.
    • The value_deserializer parameter specifies a lambda function to decode the message values from JSON format, converting them into Python dictionaries.
    • The code enters an infinite loop to continuously read messages from the user_activity topic. Each received message is processed, with the value being accessed through message.value.
    • Each user_activity message is printed to the console, allowing real-time monitoring of user activity data.

    Spark Streaming Job

    How would you implement a Spark Streaming job that listens to Kafka events and writes to Cassandra?

    This PySpark code snippet establishes a streaming data pipeline that reads events from a Kafka topic and writes them to a Cassandra database:

    • A Spark session named KafkaToCassandra is created, which is essential for working with DataFrames and streaming data in Spark.
    • The readStream method is used to create a streaming DataFrame (kafkaStream) that reads data from the Kafka topic named events, connecting to a Kafka broker at localhost:9092.
    • The code uses the from_json function to parse the JSON data contained in the Kafka message values and creates a new column called event_data. The transformed DataFrame (transformed_df) is then constructed by selecting relevant fields from the parsed JSON, specifically user_id, event_timestamp, and event_type.
    • The transformed DataFrame is written to a Cassandra database. The writeStream method specifies that the output format is Cassandra, targeting the user_ks keyspace and the user_events table. The stream starts with the start() method which initiates the continuous data ingestion process.

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import from_json, col

    # Create Spark session
    spark = SparkSession.builder
    .appName(“KafkaToCassandra”)
    .getOrCreate()

    # Reading the data stream via Kafka
    kafkaStream = spark.readStream
    .format(“kafka”)
    .option(“kafka.bootstrap.servers”, “localhost:9092”)
    .option(“subscribe”, “events”)
    .load()

    # Transformation logic
    kafkaStream = kafkaStream.withColumn(
    “event_data”, from_json(kafkaStream.value.cast(“string”))
    )
    transformed_df = kafkaStream.select(
    col(“event_data.user_id”),
    col(“event_data.event_timestamp”),
    col(“event_data.event_type”)
    )

    # Write to Cassandra using Spark-Cassandra Connector
    transformed_df.writeStream
    .format(“org.apache.spark.sql.cassandra”)
    .option(“keyspace”, “user_ks”)
    .option(“table”, “user_events”)
    .start()

    Flink Job Processing

    Design a Flink job for processing sensor data in real-time and trigger alerts for anomalies.

    The following Python code snippet uses Apache Flink to create a streaming application that detects anomalies in sensor data based on temperature readings:

    • A StreamExecutionEnvironment is instantiated using get_execution_environment(), which serves as the context for executing the streaming application.
    • The add_source method is called to create a data stream (sensor_data_stream) from a user-defined source function (your_source_function()), which is expected to generate sensor data.
    • The detect_anomaly function is defined to check if the temperature in the incoming sensor data exceeds a predefined threshold. If an anomaly is detected, it prints a message indicating the sensor and its data.
    • The filter method is applied to the sensor_data_stream using the detect_anomaly function. This results in a new stream (processed_data_stream) that only contains the sensor data where anomalies have been detected.
    • The print method is called on processed_data_stream to output the filtered data to the console, allowing for real-time monitoring of detected anomalies.
    • The execute method is invoked with the application name Sensor Anomaly Detection, which starts the streaming job and initiates the anomaly detection process.

    from pyflink.datastream import StreamExecutionEnvironment

    # Create execution environment
    env = StreamExecutionEnvironment.get_execution_environment()

    # Source: Stream data from sensors
    sensor_data_stream = env.add_source(your_source_function())

    # Process: Identify temperature anomalies in sensor data
    def detect_anomaly(sensor_data):
    if sensor_data[‘temperature’] > threshold:
    print(f”Anomaly detected in sensor: {sensor_data}”)
    return sensor_data

    processed_data_stream = sensor_data_stream.filter(detect_anomaly)

    # Sink: Trigger alerting system (or log)
    processed_data_stream.print()

    # Execute the streaming application
    env.execute(“Sensor Anomaly Detection”)

    Large-scale Distributed Join Operation

    Explain implementing a large-scale distributed join operation without OOM using Partitioning in Spark.

    The code ensures that the join operation is optimized through repartitioning, which is crucial for handling large datasets in distributed data processing applications:

    • The first two lines repartition table1 and table2 DataFrames into 100 partitions, using the column join_key as the partitioning key. This step helps optimize the subsequent join operation by ensuring that rows with the same join_key are located in the same partition, which can significantly improve performance.
    • The join method is called on table1_partitioned, joining it with table2_partitioned on the common column join_key. The result is a new DataFrame, joined_df, which contains rows where the join_key values match in both DataFrames.
    • The write method is used to save the joined_df DataFrame in Parquet format at the specified output path (/path_to_output). Parquet is a columnar storage format that is efficient for both storage and query performance.

    # Hash partitioning both tables on the same key
    table1_partitioned = table1.repartition(100, “join_key”)
    table2_partitioned = table2.repartition(100, “join_key”)

    # Perform the join operation efficiently
    joined_df = table1_partitioned.join(table2_partitioned, “join_key”)

    # Write the joined result
    joined_df.write.parquet(“/path_to_output”)

    Implementing Different Join Strategies

    Write a Spark job that demonstrates how to force Spark to use a broadcast join and a sort-merge join when joining two DataFrames.

    • By using broadcast(df_small), we force Spark to use a BroadcastHashJoin.
    • Disabling spark.sql.autoBroadcastJoinThreshold enforces a SortMergeJoin for larger tables.

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import broadcast

    spark = SparkSession.builder.appName(“JoinStrategies”).getOrCreate()

    # Create sample DataFrames
    df_large = spark.range(1000000).withColumnRenamed(“id”, “key”)
    df_small = spark.range(100).withColumnRenamed(“id”, “key”)

    # Broadcast join (forces a broadcast join for the smaller DataFrame)
    df_broadcast_join = df_large.join(broadcast(df_small), on=”key”)
    print(“Broadcast Join Plan:”)
    df_broadcast_join.explain() # Look for ‘BroadcastHashJoin’ in the physical plan

    # Sort-Merge join (forces a sort-merge join by disabling broadcast threshold)
    spark.conf.set(“spark.sql.autoBroadcastJoinThreshold”, -1)
    df_sort_merge_join = df_large.join(df_small, on=”key”)
    print(“Sort-Merge Join Plan:”)
    df_sort_merge_join.explain() # Look for ‘SortMergeJoin’ in the physical plan

    SPARK Repartition vs Coalesce

    Write a code example demonstrating how to use repartition and coalesce to modify the number of partitions for a DataFrame in Spark.

    • repartition performs a full shuffle and increases or decreases the number of partitions.
    • coalesce reduces partitions without a full shuffle, which is efficient for downscaling partitions.

    df = spark.range(100000)

    # Use repartition to increase the number of partitions to 20 (full shuffle)
    df_repartitioned = df.repartition(20)
    print(f”Number of partitions after repartition: {df_repartitioned.rdd.getNumPartitions()}”)

    # Use coalesce to reduce the number of partitions to 5 (no shuffle)
    df_coalesced = df_repartitioned.coalesce(5)
    print(f”Number of partitions after coalesce: {df_coalesced.rdd.getNumPartitions()}”)

    Bucketing and Partitioning

    Create a DataFrame and demonstrate how to write it using both bucketing and partitioning. Explain how each affects file storage.

    • Bucketing distributes data across fixed buckets but doesn’t create subdirectories.
    • Partitioning creates folders for each unique value in the partitioned columns.

    data = [(“Alice”, “Math”, 85), (“Bob”, “English”, 90), (“Alice”, “Science”, 95)]
    df = spark.createDataFrame(data, [“name”, “subject”, “score”])

    # Write with bucketing
    df.write.bucketBy(5, “name”).saveAsTable(“bucketed_table”)

    # Write with partitioning
    df.write.partitionBy(“subject”).mode(“overwrite”).parquet(“/tmp/partitioned_table”)

    # Verify the directory structure
    print(“Bucketed Table Structure:”)
    spark.sql(“SHOW PARTITIONS bucketed_table”).show() # Bucketing doesn’t create directory structure based on columns

    print(“Partitioned Table Directory Structure:”)
    spark.read.parquet(“/tmp/partitioned_table”).show() # Check directory structure by partitions

    Map Side Join Using Broadcast

    Implement a map-side join using a broadcast join in Spark to optimize joining a small lookup DataFrame with a large DataFrame.

    • Broadcasting df_lookup ensures a map-side join, eliminating the need for shuffling and making the join more efficient.

    # Large DataFrame
    df_large = spark.range(1000000).withColumnRenamed(“id”, “user_id”)
    # Small lookup DataFrame
    df_lookup = spark.createDataFrame([(1, “Gold”), (2, “Silver”), (3, “Bronze”)], [“user_id”, “membership”])

    # Perform a map-side join using broadcast
    df_joined = df_large.join(broadcast(df_lookup), “user_id”, “left”)
    df_joined.show()

    Identify and Handle Skewed Data

    Write a Spark job to detect skewness in a DataFrame by calculating the distribution of a specific column. Then, handle skewness by applying repartitionByRange.

    • Skewness is detected by grouping and counting occurrences of each key.
    • Using repartitionByRange helps balance partitions and reduce skewness.

    from pyspark.sql.functions import col

    # Sample DataFrame with skewed data
    data = [(1, “A”), (1, “B”), (1, “C”), (2, “D”), (3, “E”)]
    df_skewed = spark.createDataFrame(data, [“key”, “value”])

    # Calculate distribution to detect skewness
    df_skewed.groupBy(“key”).count().orderBy(col(“count”).desc()).show()

    # Repartition by range to manage skewness
    df_balanced = df_skewed.repartitionByRange(3, “key”)
    print(f”Partitioning after repartitionByRange: {df_balanced.rdd.glom().map(len).collect()}”)

    Handle Data Skew with Salting

    Write a code example to handle skewed data by applying salting before joining two DataFrames.

    • Adding a salt column creates random variations on the join key, distributing skewed data across partitions.

    from pyspark.sql.functions import expr

    # Original skewed DataFrame
    df1 = spark.createDataFrame([(1, “A”), (1, “B”), (2, “C”)], [“key”, “value1”])
    df2 = spark.createDataFrame([(1, “D”), (1, “E”), (2, “F”)], [“key”, “value2”])

    # Adding a salt column to distribute the skewed key (1)
    df1_salted = df1.withColumn(“salt”, expr(“floor(rand() * 3)”)) # 3 is the salt range
    df2_salted = df2.withColumn(“salt”, expr(“floor(rand() * 3)”))

    # Perform join on both key and salt to reduce skewness
    df_joined = df1_salted.join(df2_salted, (df1_salted.key == df2_salted.key) & (df1_salted.salt == df2_salted.salt), “inner”)
    df_joined.show()

    Static vs Dynamic Partitions

    Write a Spark job to demonstrate how to use both static and dynamic partitioning while writing a DataFrame.

    • Static Partitioning: Partitions are manually specified before writing.
    • Dynamic Partitioning: Spark auto-creates folders for each unique value of the partition column(s).

    hive.exec.dynamic.partition.mode should be set to “nonstrict” to enable dynamic partitioning.

    data = [(“Alice”, “2023-01”, 85), (“Bob”, “2023-02”, 90), (“Alice”, “2023-01”, 95)]
    df = spark.createDataFrame(data, [“name”, “date”, “score”])

    # Static partitioning
    df.write.mode(“overwrite”).partitionBy(“date”).parquet(“/tmp/static_partitioned_table”)

    # Dynamic partitioning
    spark.conf.set(“hive.exec.dynamic.partition.mode”, “nonstrict”)
    df.write.mode(“overwrite”).partitionBy(“name”, “date”).parquet(“/tmp/dynamic_partitioned_table”)

    Interview Tips

    Dimensional modeling is important for data warehousing.

    Dimensional modeling makes data storage, retrieval, and analysis more efficient.

    At the core of dimensional modeling are facts and dimensions. Facts represent the key measurements or metrics derived from business processes, such as sales or revenue figures.

    In contrast, dimensions provide context for these facts, offering descriptive information that allows data, such as customer details or time data, to be sliced and analyzed. 

    Attributes define the characteristics of these dimensions, adding granularity to the model.

    Within this structure, a fact table serves as the central table, containing the main quantitative data, while dimension tables provide the surrounding context.

    Facts can be categorized into three types:

    • additive,
    • non-additive,
    • and semi-additive.

    We hope this gives you a good sense of what to expect in your data engineering interviews.

    Interview Prep

    • To dive deeper, check out our data engineering course, which includes numerous mock interviews and practice lessons.
    • You can also schedule a free mock interview to hone your skills with fellow peers.
    • For more personalized support, get interview coaching from engineers at top companies.

    Best of luck with your upcoming interview!