Commonly used PySpark DataFrame operations


Some commonly used PySpark DataFrame operations categorized by their functionalities:

Data Filtering and Selection:

  1. filter(): Filtering rows based on a condition.
  2. select(): Selecting specific columns.
  3. where(): Alias for filter() method.
  4. dropDuplicates(): Removing duplicate rows.
  5. drop(): Dropping specific columns.
  6. distinct(): Fetching distinct rows.

Data Aggregation and Grouping:

  1. groupBy(): Grouping data based on one or more columns.
  2. agg(): Aggregating data using aggregate functions (sum, avg, max, min, count, etc.).
  3. count(): Counting rows in a DataFrame.
  4. approxQuantile(): Computing approximate quantiles.
  5. pivot(): Pivoting data based on a column.
  6. rollup() and cube(): Hierarchical aggregation.

Joins and Combining DataFrames:

  1. join(): Performing inner, outer, left, right joins.
  2. union(): Concatenating two DataFrames vertically.
  3. intersect(): Finding common rows between two DataFrames.
  4. except(): Finding rows present in the first DataFrame but not in the second.

Sorting and Ordering:

  1. orderBy() or sort(): Sorting rows based on column(s).
  2. sortWithinPartitions(): Sorting within each partition.
  3. repartition(): Changing the number of partitions.
  4. coalesce(): Reducing the number of partitions.

Data Transformation:

  1. withColumn(): Adding or updating columns based on transformations.
  2. cast(): Changing data types of columns.
  3. fillna(): Filling missing or null values.
  4. replace(): Replacing specific values in columns.
  5. when() and otherwise(): Conditional transformations.

Statistical Operations:

  1. describe(): Descriptive statistics of columns.
  2. corr(): Calculating correlation between columns.
  3. approxQuantile(): Calculating quantiles.
  4. sample(): Sampling data from DataFrame.
  5. crosstab(): Creating contingency tables.

Date and Time Operations:

  1. withColumn(): Extracting components like year, month, day from timestamps.
  2. to_date(), to_timestamp(): Converting string to date or timestamp format.
  3. datediff(): Calculating the difference between dates.
  4. window(): Defining windows for aggregations.

String Manipulation:

  1. split(): Splitting strings into arrays.
  2. concat(): Concatenating columns or strings.
  3. substring(): Extracting substrings.
  4. upper(), lower(), trim(): Changing case and removing spaces.

User-Defined Functions (UDFs):

  1. udf(): Creating user-defined functions.
  2. Applying UDFs on DataFrame columns for custom transformations.

Handling Missing Values:

  1. na.drop(): Dropping rows with null values.
  2. na.fill(): Filling null values.
  3. na.replace(): Replacing specific values.

Window Functions:

  1. over(): Specifying window functions for aggregations.
  2. Window functions like rank(), dense_rank(), lead(), lag().

Aggregation Functions:

  1. sum(), avg(), max(), min(): Aggregating data.
  2. collect_list(), collect_set(): Collecting values into lists or sets.

Mathematical Functions:

  1. sqrt(), log(), exp(), abs(): Mathematical transformations.
  2. rand(), randn(): Generating random numbers.

Type Conversion:

  1. cast(): Converting column types.
  2. withColumn(): Type conversion using expressions.

Splitting and Exploding Arrays:

  1. split(): Splitting string columns into arrays.
  2. explode(): Exploding arrays into rows.

Handling DataFrame Metadata:

  1. printSchema(): Printing the schema of DataFrame.
  2. schema(): Accessing DataFrame schema.
  3. withMetadata(): Adding metadata to DataFrame columns.

Data Profiling:

  1. summary(): Generating summary statistics.
  2. approxQuantile(): Profiling quantiles.
  3. approxCountDistinct(): Estimating distinct count.

Working with Null Values:

  1. isNull(), isNotNull(): Filtering null or non-null values.
  2. na.replace(): Replacing null values.

Handling Structured Data:

  1. Working with nested structures.
  2. struct(): Creating struct columns.

Applying SQL Operations:

  1. createOrReplaceTempView(): Creating temporary views for SQL queries.
  2. spark.sql(): Executing SQL queries on DataFrames.

Advanced Operations:

  1. map(): Transforming each row using a function.
  2. foreach(): Applying a function on each row.

Windowing Operations:

  1. partitionBy(): Specifying partition columns for window operations.
  2. orderBy(): Defining the order within partitions.

Advanced Joins:

  1. broadcast(): Broadcasting smaller DataFrames for optimization.
  2. hint(): Providing join hints for optimization.

Handling Complex Types:

  1. getItem(): Accessing elements in complex types like arrays and maps.
  2. getField(): Accessing fields in struct columns.

Working with JSON Data:

  1. get_json_object(): Extracting JSON objects from strings.
  2. from_json(), to_json(): Converting DataFrame columns to/from JSON.

Binary Operations:

  1. Working with binary data.
  2. to_base64(), from_base64(): Encoding/decoding binary data.

Handling Timezone and Date Operations:

  1. from_utc_timestamp(), to_utc_timestamp(): Converting timestamps to UTC.
  2. current_timestamp(): Getting the current timestamp.

Handling Parquet and Avro Formats:

  1. Reading and writing data in Parquet and Avro formats.
  2. read.parquet(), write.parquet().
  3. read.format('avro'), write.format('avro').

Approximate Algorithms:

  1. Approximate algorithms for data sketches.
  2. approxCountDistinct(), approxQuantile().

Structured Streaming Operations:

  1. readStream, writeStream: Streaming operations.
  2. window() with streaming data.

Handling Large Datasets:

  1. Optimizations for handling large datasets.
  2. Setting appropriate memory and execution configurations.

Dynamic Partitioning:

  1. Dynamic partitioning strategies.
  2. bucketBy(): Bucketing strategies.

Handling Skewed Data:

  1. Strategies for handling skewed data distribution.
  2. skewness(), kurtosis() for analysis.

Machine Learning Integration:

  1. Feature engineering using DataFrame operations.
  2. Data preprocessing for MLlib.

Integration with External Systems:

  1. Reading from and writing to external databases.
  2. jdbc() for connecting to JDBC databases.
  3. writeStream.format("kafka") for streaming data to Kafka.

Handling Outliers:

  1. percentile_approx(): Dealing with outliers.
  2. Advanced statistical functions for outlier detection.
  3. Customized transformations for outlier treatment.

These operations cover a wide array of functionalities and can be combined and applied to manipulate, preprocess, and transform data effectively using PySpark DataFrames in a distributed computing environment. The choice of operations depends on the specific data processing needs and requirements.

Some code examples on similar lines are as under -

# Filter data based on a condition filtered_df = df.filter(df['column_name'] > 100)

# Select specific columns

selected_df = df.select('column_name_1', 'column_name_2')

# Apply aggregate functions
aggregated_df = df.groupBy('column_name').agg({'column_to_aggregate': 'sum'})

# Join two DataFrames
joined_df = df1.join(df2, df1['common_column'] == df2['common_column'], 'inner')

# Sort data based on a column
sorted_df = df.orderBy('column_name', ascending=False)

# Fill null values with a specific value
filled_df = df.fillna({'column_name': 'default_value'})

# Apply a function to each row
def my_function(row):
    # Perform operations on row
    return row

mapped_df = df.rdd.map(my_function)

# Add a new column or modify an existing one
modified_df = df.withColumn('new_column', df['existing_column'] * 2)

# Take a sample of the DataFrame
sampled_df = df.sample(withReplacement=False, fraction=0.5)

# Applying Aggregations:
aggregated_df = df.groupBy('column_name').agg({'column_to_aggregate': 'sum'})

# Joining DataFrames:
joined_df = df1.join(df2, df1['common_column'] == df2['common_column'], 'inner')

# Sorting Data:
sorted_df = df.orderBy('column_name', ascending=False)

# Handling Null Values:
filled_df = df.fillna({'column_name': 'default_value'})

# Mapping Data:
mapped_df = df.rdd.map(lambda row: my_function(row))

# Adding/Modifying Columns:
modified_df = df.withColumn('new_column', df['existing_column'] * 2)

# Dropping Columns:
dropped_df = df.drop('column_to_drop')

# Sampling Data:
sampled_df = df.sample(withReplacement=False, fraction=0.5)

# Distinct Values:
distinct_df = df.select('column_name').distinct()

# Grouping and Counting:
grouped_count_df = df.groupBy('column_name').count()

# Concatenating Columns:
concatenated_df = df.withColumn('concatenated_col', concat(df['col1'], lit('_'), df['col2']))

#String Operations:
modified_string_df = df.withColumn('new_string_col', regexp_replace(df['text_col'], 'pattern', 'replacement'))

# Date Operations:
formatted_date_df = df.withColumn('formatted_date', date_format(df['date_col'], 'yyyy-MM-dd'))

# Filtering by Date Range:
date_filtered_df = df.filter((df['date_col'] >= '2023-01-01') & (df['date_col'] <= '2023-12-31'))

# Aggregating and Calculating Mean:
mean_df = df.groupBy('group_col').agg(mean('numeric_col'))

# Calculating Standard Deviation:
stddev_df = df.groupBy('group_col').agg(stddev('numeric_col'))

# Calculating Percentiles:
percentile_df = df.approxQuantile('numeric_col', [0.25, 0.5, 0.75], 0.01)

# Pivot and Pivot Aggregation:
pivoted_df = df.groupBy('key_col').pivot('pivot_col').agg(sum('value_col'))

# String Splitting:
split_df = df.withColumn('split_col', split(df['text_col'], 'delimiter'))

# Handling Missing Values:
df = df.dropna() # Drop rows with any null value

# Casting Data Types:
typed_df = df.withColumn('new_col', df['old_col'].cast(IntegerType()))

# Window Functions:
windowed_df = df.withColumn('avg_col', avg('value_col').over(Window.partitionBy('partition_col')))

# Handling Duplicates:
deduplicated_df = df.dropDuplicates()

# Reshaping Data - Unpivot/Melt:
unpivoted_df = df.selectExpr('id', 'stack(2, "col1", col1, "col2", col2) as (key, value)')

# Reshaping Data - Pivot/Casting Columns:
pivoted_cast_df = df.groupBy('id').pivot('key').agg(first('value'))

# Broadcasting Small DataFrames:
broadcast_df = df.join(broadcast(small_df), 'common_column')

# Caching DataFrames:
df.cache()

# Replicating Rows:
replicated_df = df.withColumn('new_col', explode(array([lit(x) for x in range(3)])))

# Calculating Row Number/Row Index:
indexed_df = df.withColumn('row_index', row_number().over(Window.orderBy('any_col')))

# Ranking and Window Functions:
ranked_df = df.withColumn('rank_col', rank().over(Window.partitionBy('partition_col').orderBy('order_col')))

# Finding Maximum Value in Column:
max_val = df.agg({'numeric_col': 'max'}).collect()[0][0]

# Union of DataFrames:
unioned_df = df1.union(df2)

# Intersect of DataFrames:
intersect_df = df1.intersect(df2)

# Except/Difference of DataFrames:
except_df = df1.exceptAll(df2)

# Sampling by Fraction:
sampled_df = df.sample(withReplacement=False, fraction=0.1)

# Splitting Data into Training and Testing Sets:
train_df, test_df = df.randomSplit([0.7, 0.3], seed=42)

# Aggregating by Window Functions:
window_agg_df = df.withColumn('windowed_sum', sum('value_col').over(Window.partitionBy('group_col').orderBy('date_col').rangeBetween(Window.unboundedPreceding, Window.currentRow)))

# String Concatenation with Separator:
concatenated_df = df.withColumn('concat_col', concat_ws(',', 'col1', 'col2'))

# Filling Nulls with Previous or Next Value:
filled_df = df.fillna(method='bfill', subset=['column_to_fill'])

# Cross-Joining DataFrames:
cross_joined_df = df1.crossJoin(df2)

# Collecting Rows into a List:
collected_list = df.select('column_name').collect()

# Converting Pandas DataFrame to PySpark DataFrame:
spark_df = spark.createDataFrame(pandas_df)

# Aggregating Data by Rolling Window:
rolling_agg_df = df.withColumn('rolling_sum', sum('value_col').over(Window.orderBy('date_col').rowsBetween(-2, 0)))

# Calculating Cumulative Sum:
cumsum_df = df.withColumn('cumulative_sum', sum('value_col').over(Window.orderBy('date_col').rowsBetween(Window.unboundedPreceding, Window.currentRow)))

# Approximate Quantiles and Percentiles:
approx_quantiles = df.approxQuantile('numeric_col', [0.25, 0.5, 0.75], 0.01)

# Calculating Histogram:
histogram_df = df.groupBy('column_name').agg(buckets('numeric_col', 5))

# Converting Unix Timestamp to Date/Time:
converted_df = df.withColumn('datetime_col', from_unixtime('unix_timestamp_col'))

# Calculating Time Differences:
time_diff_df = df.withColumn('time_diff', datediff('end_time', 'start_time'))

# Reshaping Wide to Long Format (Melt/Stack):
melted_df = df.selectExpr('id', 'stack(2, "col1", col1, "col2", col2) as (key, value)')

# Reshaping Long to Wide Format (Pivot/Unstack):
pivoted_df = df.groupBy('id').pivot('key').agg(first('value'))

# Combining String Columns:
combined_df = df.withColumn('combined_col', concat_ws(' ', 'col1', 'col2', 'col3'))

# Calculating Unique Values Count per Group:
unique_count_df = df.groupBy('group_col').agg(countDistinct('value_col'))

# Calculating Z-Scores:
z_score_df = df.withColumn('z_score', (df['value_col'] - mean(df['value_col'])).alias('mean_value') / stddev(df['value_col']).alias('std_dev'))

# Calculating Skewness and Kurtosis:
skewness_df = df.select(skewness('value_col'))
kurtosis_df = df.select(kurtosis('value_col'))

# Binning Continuous Data:
bin_df = df.withColumn('bin_col', when(col('value_col') <= 50, 'Bin 1').when(col('value_col') <= 100, 'Bin 2').otherwise('Bin 3'))



Post a Comment

Previous Post Next Post