def data_cleansing_customer_id(spark: SparkSession, df: DataFrame) -> DataFrame:
from pyspark.sql.functions import col, trim, regexp_replace, lower, upper, initcap
from pyspark.sql.types import StringType, IntegerType, FloatType, DoubleType, LongType, ShortType
transformed_columns = []
# Check if column exists after null operations
if "customer_id" not in df.columns:
print("Warning: Column 'customer_id' not found after null operation. Skipping transformations for this column.")
else:
col_type = df.schema["customer_id"].dataType
# If the column is a string type, apply text-based operations
if isinstance(df.schema["customer_id"].dataType, StringType):
df = df.na.fill({"customer_id" : "NA"})
transformed_columns = [lower(col("customer_id")).alias("customer_id")]
elif isinstance(col_type, (IntegerType, FloatType, DoubleType, LongType, ShortType)):
df = df.na.fill({"customer_id" : 0})
transformed_columns = [col("customer_id")]
else:
transformed_columns = [col("customer_id")]
df = df.select(*[col(c) for c in df.columns if c not in ["customer_id"]], *transformed_columns)
return df