Some commonly used PySpark DataFrame operations categorized by their functionalities:
Data Filtering and Selection:
filter()
: Filtering rows based on a condition.select()
: Selecting specific columns.where()
: Alias forfilter()
method.dropDuplicates()
: Removing duplicate rows.drop()
: Dropping specific columns.distinct()
: Fetching distinct rows.
Data Aggregation and Grouping:
groupBy()
: Grouping data based on one or more columns.agg()
: Aggregating data using aggregate functions (sum, avg, max, min, count, etc.).count()
: Counting rows in a DataFrame.approxQuantile()
: Computing approximate quantiles.pivot()
: Pivoting data based on a column.rollup()
andcube()
: Hierarchical aggregation.
Joins and Combining DataFrames:
join()
: Performing inner, outer, left, right joins.union()
: Concatenating two DataFrames vertically.intersect()
: Finding common rows between two DataFrames.except()
: Finding rows present in the first DataFrame but not in the second.
Sorting and Ordering:
orderBy()
orsort()
: Sorting rows based on column(s).sortWithinPartitions()
: Sorting within each partition.repartition()
: Changing the number of partitions.coalesce()
: Reducing the number of partitions.
Data Transformation:
withColumn()
: Adding or updating columns based on transformations.cast()
: Changing data types of columns.fillna()
: Filling missing or null values.replace()
: Replacing specific values in columns.when()
andotherwise()
: Conditional transformations.
Statistical Operations:
describe()
: Descriptive statistics of columns.corr()
: Calculating correlation between columns.approxQuantile()
: Calculating quantiles.sample()
: Sampling data from DataFrame.crosstab()
: Creating contingency tables.
Date and Time Operations:
withColumn()
: Extracting components like year, month, day from timestamps.to_date()
,to_timestamp()
: Converting string to date or timestamp format.datediff()
: Calculating the difference between dates.window()
: Defining windows for aggregations.
String Manipulation:
split()
: Splitting strings into arrays.concat()
: Concatenating columns or strings.substring()
: Extracting substrings.upper()
,lower()
,trim()
: Changing case and removing spaces.
User-Defined Functions (UDFs):
udf()
: Creating user-defined functions.- Applying UDFs on DataFrame columns for custom transformations.
Handling Missing Values:
na.drop()
: Dropping rows with null values.na.fill()
: Filling null values.na.replace()
: Replacing specific values.
Window Functions:
over()
: Specifying window functions for aggregations.- Window functions like
rank()
,dense_rank()
,lead()
,lag()
.
Aggregation Functions:
sum()
,avg()
,max()
,min()
: Aggregating data.collect_list()
,collect_set()
: Collecting values into lists or sets.
Mathematical Functions:
sqrt()
,log()
,exp()
,abs()
: Mathematical transformations.rand()
,randn()
: Generating random numbers.
Type Conversion:
cast()
: Converting column types.withColumn()
: Type conversion using expressions.
Splitting and Exploding Arrays:
split()
: Splitting string columns into arrays.explode()
: Exploding arrays into rows.
Handling DataFrame Metadata:
printSchema()
: Printing the schema of DataFrame.schema()
: Accessing DataFrame schema.withMetadata()
: Adding metadata to DataFrame columns.
Data Profiling:
summary()
: Generating summary statistics.approxQuantile()
: Profiling quantiles.approxCountDistinct()
: Estimating distinct count.
Working with Null Values:
isNull()
,isNotNull()
: Filtering null or non-null values.na.replace()
: Replacing null values.
Handling Structured Data:
- Working with nested structures.
struct()
: Creating struct columns.
Applying SQL Operations:
createOrReplaceTempView()
: Creating temporary views for SQL queries.spark.sql()
: Executing SQL queries on DataFrames.
Advanced Operations:
map()
: Transforming each row using a function.foreach()
: Applying a function on each row.
Windowing Operations:
partitionBy()
: Specifying partition columns for window operations.orderBy()
: Defining the order within partitions.
Advanced Joins:
broadcast()
: Broadcasting smaller DataFrames for optimization.hint()
: Providing join hints for optimization.
Handling Complex Types:
getItem()
: Accessing elements in complex types like arrays and maps.getField()
: Accessing fields in struct columns.
Working with JSON Data:
get_json_object()
: Extracting JSON objects from strings.from_json()
,to_json()
: Converting DataFrame columns to/from JSON.
Binary Operations:
- Working with binary data.
to_base64()
,from_base64()
: Encoding/decoding binary data.
Handling Timezone and Date Operations:
from_utc_timestamp()
,to_utc_timestamp()
: Converting timestamps to UTC.current_timestamp()
: Getting the current timestamp.
Handling Parquet and Avro Formats:
- Reading and writing data in Parquet and Avro formats.
read.parquet()
,write.parquet()
.read.format('avro')
,write.format('avro')
.
Approximate Algorithms:
- Approximate algorithms for data sketches.
approxCountDistinct()
,approxQuantile()
.
Structured Streaming Operations:
readStream
,writeStream
: Streaming operations.window()
with streaming data.
Handling Large Datasets:
- Optimizations for handling large datasets.
- Setting appropriate memory and execution configurations.
Dynamic Partitioning:
- Dynamic partitioning strategies.
bucketBy()
: Bucketing strategies.
Handling Skewed Data:
- Strategies for handling skewed data distribution.
skewness()
,kurtosis()
for analysis.
Machine Learning Integration:
- Feature engineering using DataFrame operations.
- Data preprocessing for MLlib.
Integration with External Systems:
- Reading from and writing to external databases.
jdbc()
for connecting to JDBC databases.writeStream.format("kafka")
for streaming data to Kafka.
Handling Outliers:
percentile_approx()
: Dealing with outliers.- Advanced statistical functions for outlier detection.
- Customized transformations for outlier treatment.
These operations cover a wide array of functionalities and can be combined and applied to manipulate, preprocess, and transform data effectively using PySpark DataFrames in a distributed computing environment. The choice of operations depends on the specific data processing needs and requirements.
Some code examples on similar lines are as under -
# Filter data based on a condition filtered_df = df.filter(df['column_name'] > 100)
# Select specific columns
selected_df = df.select('column_name_1', 'column_name_2')
# Apply aggregate functions
aggregated_df = df.groupBy('column_name').agg({'column_to_aggregate': 'sum'})
# Join two DataFrames
joined_df = df1.join(df2, df1['common_column'] == df2['common_column'], 'inner')
# Sort data based on a column
sorted_df = df.orderBy('column_name', ascending=False)
# Fill null values with a specific value
filled_df = df.fillna({'column_name': 'default_value'})
# Apply a function to each row
def my_function(row):
# Perform operations on row
return row
mapped_df = df.rdd.map(my_function)
# Add a new column or modify an existing one
modified_df = df.withColumn('new_column', df['existing_column'] * 2)
# Take a sample of the DataFrame
sampled_df = df.sample(withReplacement=False, fraction=0.5)
# Applying Aggregations:
aggregated_df = df.groupBy('column_name').agg({'column_to_aggregate': 'sum'})
# Joining DataFrames:
joined_df = df1.join(df2, df1['common_column'] == df2['common_column'], 'inner')
# Sorting Data:
sorted_df = df.orderBy('column_name', ascending=False)
# Handling Null Values:
filled_df = df.fillna({'column_name': 'default_value'})
# Mapping Data:
mapped_df = df.rdd.map(lambda row: my_function(row))
# Adding/Modifying Columns:
modified_df = df.withColumn('new_column', df['existing_column'] * 2)
# Dropping Columns:
dropped_df = df.drop('column_to_drop')
# Sampling Data:
sampled_df = df.sample(withReplacement=False, fraction=0.5)
# Distinct Values:
distinct_df = df.select('column_name').distinct()
# Grouping and Counting:
grouped_count_df = df.groupBy('column_name').count()
# Concatenating Columns:
concatenated_df = df.withColumn('concatenated_col', concat(df['col1'], lit('_'), df['col2']))
#String Operations:
modified_string_df = df.withColumn('new_string_col', regexp_replace(df['text_col'], 'pattern', 'replacement'))
# Date Operations:
formatted_date_df = df.withColumn('formatted_date', date_format(df['date_col'], 'yyyy-MM-dd'))
# Filtering by Date Range:
date_filtered_df = df.filter((df['date_col'] >= '2023-01-01') & (df['date_col'] <= '2023-12-31'))
# Aggregating and Calculating Mean:
mean_df = df.groupBy('group_col').agg(mean('numeric_col'))
# Calculating Standard Deviation:
stddev_df = df.groupBy('group_col').agg(stddev('numeric_col'))
# Calculating Percentiles:
percentile_df = df.approxQuantile('numeric_col', [0.25, 0.5, 0.75], 0.01)
# Pivot and Pivot Aggregation:
pivoted_df = df.groupBy('key_col').pivot('pivot_col').agg(sum('value_col'))
# String Splitting:
split_df = df.withColumn('split_col', split(df['text_col'], 'delimiter'))
# Handling Missing Values:
df = df.dropna() # Drop rows with any null value
# Casting Data Types:
typed_df = df.withColumn('new_col', df['old_col'].cast(IntegerType()))
# Window Functions:
windowed_df = df.withColumn('avg_col', avg('value_col').over(Window.partitionBy('partition_col')))
# Handling Duplicates:
deduplicated_df = df.dropDuplicates()
# Reshaping Data - Unpivot/Melt:
unpivoted_df = df.selectExpr('id', 'stack(2, "col1", col1, "col2", col2) as (key, value)')
# Reshaping Data - Pivot/Casting Columns:
pivoted_cast_df = df.groupBy('id').pivot('key').agg(first('value'))
# Broadcasting Small DataFrames:
broadcast_df = df.join(broadcast(small_df), 'common_column')
# Caching DataFrames:
df.cache()
# Replicating Rows:
replicated_df = df.withColumn('new_col', explode(array([lit(x) for x in range(3)])))
# Calculating Row Number/Row Index:
indexed_df = df.withColumn('row_index', row_number().over(Window.orderBy('any_col')))
# Ranking and Window Functions:
ranked_df = df.withColumn('rank_col', rank().over(Window.partitionBy('partition_col').orderBy('order_col')))
# Finding Maximum Value in Column:
max_val = df.agg({'numeric_col': 'max'}).collect()[0][0]
# Union of DataFrames:
unioned_df = df1.union(df2)
# Intersect of DataFrames:
intersect_df = df1.intersect(df2)
# Except/Difference of DataFrames:
except_df = df1.exceptAll(df2)
# Sampling by Fraction:
sampled_df = df.sample(withReplacement=False, fraction=0.1)
# Splitting Data into Training and Testing Sets:
train_df, test_df = df.randomSplit([0.7, 0.3], seed=42)
# Aggregating by Window Functions:
window_agg_df = df.withColumn('windowed_sum', sum('value_col').over(Window.partitionBy('group_col').orderBy('date_col').rangeBetween(Window.unboundedPreceding, Window.currentRow)))
# String Concatenation with Separator:
concatenated_df = df.withColumn('concat_col', concat_ws(',', 'col1', 'col2'))
# Filling Nulls with Previous or Next Value:
filled_df = df.fillna(method='bfill', subset=['column_to_fill'])
# Cross-Joining DataFrames:
cross_joined_df = df1.crossJoin(df2)
# Collecting Rows into a List:
collected_list = df.select('column_name').collect()
# Converting Pandas DataFrame to PySpark DataFrame:
spark_df = spark.createDataFrame(pandas_df)
# Aggregating Data by Rolling Window:
rolling_agg_df = df.withColumn('rolling_sum', sum('value_col').over(Window.orderBy('date_col').rowsBetween(-2, 0)))
# Calculating Cumulative Sum:
cumsum_df = df.withColumn('cumulative_sum', sum('value_col').over(Window.orderBy('date_col').rowsBetween(Window.unboundedPreceding, Window.currentRow)))
# Approximate Quantiles and Percentiles:
approx_quantiles = df.approxQuantile('numeric_col', [0.25, 0.5, 0.75], 0.01)
# Calculating Histogram:
histogram_df = df.groupBy('column_name').agg(buckets('numeric_col', 5))
# Converting Unix Timestamp to Date/Time:
converted_df = df.withColumn('datetime_col', from_unixtime('unix_timestamp_col'))
# Calculating Time Differences:
time_diff_df = df.withColumn('time_diff', datediff('end_time', 'start_time'))
# Reshaping Wide to Long Format (Melt/Stack):
melted_df = df.selectExpr('id', 'stack(2, "col1", col1, "col2", col2) as (key, value)')
# Reshaping Long to Wide Format (Pivot/Unstack):
pivoted_df = df.groupBy('id').pivot('key').agg(first('value'))
# Combining String Columns:
combined_df = df.withColumn('combined_col', concat_ws(' ', 'col1', 'col2', 'col3'))
# Calculating Unique Values Count per Group:
unique_count_df = df.groupBy('group_col').agg(countDistinct('value_col'))
# Calculating Z-Scores:
z_score_df = df.withColumn('z_score', (df['value_col'] - mean(df['value_col'])).alias('mean_value') / stddev(df['value_col']).alias('std_dev'))
# Calculating Skewness and Kurtosis:
skewness_df = df.select(skewness('value_col'))
kurtosis_df = df.select(kurtosis('value_col'))
# Binning Continuous Data:
bin_df = df.withColumn('bin_col', when(col('value_col') <= 50, 'Bin 1').when(col('value_col') <= 100, 'Bin 2').otherwise('Bin 3'))
Post a Comment