PySpark source code that performs data transformations
Process data from incoming nodes using PySpark custom code and manage DataFrames using following steps:
Create DataFrame from the incoming node data:
df = spark.sql("SELECT * FROM input_relation")
After creating the DataFrame, perform the necessary transformations on the data as needed.
Once the transformations are complete, register the new DataFrame using the following code:
out_df.createOrReplaceTempView("currentnodename")
If you need to create additional output DataFrames, you can do so with:
out_df.createOrReplaceTempView("currentnodename_outputrelation")
Ensure that any additional output relations are specified in the "Output To" property value to help data flow to the downstream nodes.
Default:
import pyspark from pyspark.sql.functions
import col,lit from pyspark.sql
import SparkSession from pyspark.sql
import functions as F
def execute_node(spark: SparkSession):
## Read data from the incoming node <input_relation>
df = spark.sql("SELECT * FROM <input_relation>")
## Create a new dataframe
out_df = df.withColumn("new", F.lit("pw"))
## Register the outgoing dataframes
out_df.createOrReplaceTempView("<output_relation>")