πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Snowpark Python: UDFs, Stored Procedures & DataFrames

🟒 Free Lesson

Advertisement

Snowpark Python: UDFs, Stored Procedures & DataFrames

Architecture Diagram 1: Snowpark Architecture

Architecture Diagram 2: UDF Architecture

Architecture Diagram 3: Stored Procedures Architecture

Architecture Diagram 4: Snowpark DataFrame Lazy Evaluation

Snowpark DataFrame Lazy Evaluation PipelineDataFrame API.select().filter()No execution yetPlan BuildingOptimize queryPredicate pushdownSQL TranslationPython to SQLColumn pruningExecutioncollect() triggersParallel on warehouseLazy: Operations build plan | Actions (collect, count) trigger execution | Results cached locally

A Snowpark DataFrame is a lazy evaluation abstraction that represents a computation plan in Snowpark. Operations (select, filter, join, groupBy) build a computation graph without executing. Execution occurs only when an action is invoked (collect, count, write), and the optimized plan is translated into SQL executed on Snowflake warehouses.

A Snowpark Session is the entry point for all Snowpark operations. It manages connection to Snowflake, handles authentication, and provides methods to create DataFrames, register UDFs, and manage transactions. Each session represents a single authenticated connection.

Snowpark Execution Cost
total_cost=warehouse_creditsΓ—execution_timetotal\_cost = warehouse\_credits \times execution\_time

UDF Execution Model

  • Python UDF: Interpreted per-row, overhead ~1-10ms/row; best for lightweight transforms
  • Python Stored Proc: Pre-compiled, lower overhead; best for batch operations
  • Batch execution: Group rows into batches of 100-1000 for UDF processing
  • Memory: UDFs have access to Snowflake warehouse memory; avoid large state
  • Performance: Vectorized execution (1.5-3Γ— faster) available for numeric operations

Use Snowpark for: ETL pipelines (DataFrame transformations), ML feature engineering, data quality validation, and complex data processing. Prefer stored procedures over UDFs for batch operations. Use cache() for frequently accessed DataFrames.

  • Lazy evaluation: Operations build plans; execution only on actions
  • SQL translation: DataFrame operations compile to optimized SQL
  • Multi-language: Python, Java, Scala support with uniform DataFrame API
  • ML integration: scikit-learn, XGBoost, PyTorch via Snowpark ML
  • Cost model: Same as regular queries β€” warehouse credits Γ— execution time


Detailed Explanation

What is Snowpark?

Snowflake's developer framework for writing data-intensive applications in Python, Java, and Scala β€” executing entirely within Snowflake.


Core Abstraction: DataFrames

  • Lazy evaluation β€” operations build computation plans, execute only on actions
  • Actions: collect(), count(), show(), write.mode().save_as_table()
  • Transformations: filter(), select(), group_by(), agg(), join()
  • Automatic SQL translation with predicate pushdown and column pruning

UDF Implementation Patterns

TypeInput β†’ OutputUse Case
Scalar UDF1 row β†’ 1 valueData transformations
Table UDF (UDTF)1 row β†’ N rowsData expansion
Aggregate UDFN rows β†’ 1 valueCustom aggregations
  • Keep UDF logic simple; avoid external dependencies
  • Batch processing reduces per-row overhead
  • Vectorized execution (1.5–3Γ— faster) for numeric operations

Stored Procedures for Complex Workflows

  • Access Session object for dynamic SQL, data read/write
  • Transaction management for data consistency
  • Useful for ETL, data quality checks, complex business logic

Session Management

  • Entry point for all Snowpark operations
  • Configurable: account, warehouse, database/schema context
  • Production: use dedicated service accounts, connection pooling

Best Practices

  1. Optimize DataFrame operations with predicate pushdown and column pruning
  2. Keep UDFs simple; use batch processing for large datasets
  3. Manage sessions properly with appropriate warehouse sizes
  4. Handle errors with try-except blocks and logging
  5. Test with representative data volumes

Key Takeaway: Prefer Snowpark over Spark for transformations that can execute within Snowflake to minimize data movement.

Key Concepts Table

ComponentPurposeExecutionUse Case
DataFrameDistributed datasetSQL translationData manipulation
UDFCustom functionRow/batch processingData transformation
UDTFTable functionRow processingData expansion
Stored ProcedureComplex workflowFull Python runtimeETL, business logic
Return TypeUsageExample
StringTypeText results"Processed 100 rows"
IntegerTypeNumeric results42
FloatTypeDecimal results3.14
BooleanTypeTrue/falseTrue
PandasDataFrameResult setsdf.to_pandas()
Session MethodPurposeExample
table()Read tablesession.table("sales")
sql()Execute SQLsession.sql("SELECT 1")
writeWrite datadf.write.save_as_table()
use_database()Set contextsession.use_database("db")

Code Examples

Session Management and DataFrame Basics

# Example 1: Basic DataFrame operations
# Session is the entry point for all Snowpark operations
# It manages connection, authentication, and transaction context
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col, sum as sum_, avg, count, lit

# Create session with explicit configuration
# Each config option controls connection behavior:
#   account: Snowflake account identifier
#   user: Database user for authentication
#   password: Authentication credential
#   role: RBAC role controlling access permissions
#   warehouse: Compute resource for query execution
#   database/schema: Default context for unqualified object names
session = Session.builder.configs({
    "account": "your_account",
    "user": "your_user",
    "password": "your_password",
    "role": "ANALYTICS_ROLE",
    "warehouse": "ANALYTICS_WH",
    "database": "ANALYTICS_DB",
    "schema": "PUBLIC"
}).create()

# Read table into DataFrame (lazy evaluation - no execution yet)
df = session.table("sales_data")

# Filter: adds WHERE clause to the query plan
# group_by: adds GROUP BY clause
# agg: applies aggregate functions
# collect(): triggers execution and returns results to client
result = df.filter(col("amount") > 1000) \
           .group_by("region") \
           .agg(
               sum_(col("amount")).alias("total_amount"),
               avg(col("amount")).alias("avg_amount"),
               count("*").alias("row_count")
           ) \
           .order_by(col("total_amount").desc()) \
           .collect()

# Show results
for row in result:
    print(f"Region: {row['REGION']}, Total: {row['TOTAL_AMOUNT']}, Avg: {row['AVG_AMOUNT']}")

# Alternative: show() prints formatted output without returning to client
df.filter(col("amount") > 1000).group_by("region").agg(sum_(col("amount")).alias("total")).show()

Scalar and Table UDFs

# Example 2: Scalar UDF
# Scalar UDFs process one row at a time, returning a single value
# Used in SELECT, WHERE, and CASE expressions
from snowflake.snowpark.types import FloatType, StringType
from snowflake.snowpark.udf import udf

# @udf decorator registers the function as a Snowflake UDF
# Parameters:
#   name: SQL function name for invocation
#   return_type: Data type of the return value
#   input_types: List of input parameter types
#   packages: Python packages available in the UDF runtime
@udf(name="calculate_tax",
     return_type=FloatType(),
     input_types=[FloatType(), FloatType()],
     packages=["pandas"])
def calculate_tax(amount: float, tax_rate: float) -> float:
    """Calculate tax amount given a price and tax rate percentage."""
    return amount * (tax_rate / 100)

# Register UDF globally for SQL access
session.udf.register(calculate_tax)

# Use UDF in SQL query
result = session.sql("""
    SELECT product, amount, 
           calculate_tax(amount, 0.08) as tax,
           amount + calculate_tax(amount, 0.08) as total_with_tax
    FROM sales
""").collect()
# Example 3: Table UDF (UDTF)
# UDTFs process one row and can return multiple rows (1-to-many expansion)
# Used with LATERAL JOIN for data expansion
from snowflake.snowpark.types import StructType, StructField, StringType, IntegerType
from snowflake.snowpark.udtf import udtf

# output_schema defines the structure of returned rows
# input_types defines the input parameters
@udtf(name="parse_json_array",
      output_schema=StructType([
          StructField("key", StringType()),
          StructField("value", StringType())
      ]),
      input_types=[StringType()])
def parse_json_array(json_str: str):
    """Parse a JSON object and return key-value pairs as rows."""
    import json
    data = json.loads(json_str)
    for key, value in data.items():
        yield (key, str(value))  # yield produces rows (generator pattern)

# Register and use UDTF
session.udtf.register(parse_json_array)
result = session.sql("""
    SELECT *
    FROM TABLE(parse_json_array('{"a": 1, "b": 2, "c": 3}'))
""").collect()

# UDTF with multiple input columns
@udtf(name="split_string",
      output_schema=StructType([
          StructField("token", StringType()),
          StructField("position", IntegerType())
      ]),
      input_types=[StringType(), StringType()])
def split_string(text: str, delimiter: str):
    """Split a string by delimiter and return tokens with positions."""
    tokens = text.split(delimiter)
    for i, token in enumerate(tokens):
        yield (token, i)

session.udtf.register(split_string)
result = session.sql("""
    SELECT original_text, token, position
    FROM TABLE(split_string('hello-world-snowflake', '-'))
""").collect()

Stored Procedures for Complex Workflows

# Example 4: Stored procedure with full parameter documentation
from snowflake.snowpark import Session

def process_sales_data(session: Session, region: str) -> str:
    """
    Process sales data for a specific region.
    
    Args:
        session: Snowpark session for data access
        region: Geographic region code (e.g., 'US', 'EU', 'APAC')
    
    Returns:
        Summary string with processing statistics
    """
    # Read data using session.table() - returns lazy DataFrame
    df = session.table("raw_sales")
    
    # Filter: WHERE clause pushed down to Snowflake
    filtered = df.filter(col("region") == region.upper())
    
    # Aggregate: GROUP BY with SUM
    aggregated = filtered.group_by("product") \
                         .agg(sum_(col("amount")).alias("total"))
    
    # Write results using save_as_table()
    #   mode="overwrite": TRUNCATE + INSERT (replaces entire table)
    #   mode="append": INSERT INTO (adds to existing data)
    aggregated.write.mode("overwrite") \
             .save_as_table(f"sales_{region.lower()}")
    
    # Count triggers execution; returns integer
    row_count = filtered.count()
    return f"Processed {row_count} rows for {region}"

# Register procedure with explicit type declarations
#   return_type: Python return type -> Snowflake type mapping
#   input_types: Python parameter types -> Snowflake type mapping
#   packages: Available Python packages in the stored proc runtime
session.sproc.register(
    process_sales_data,
    return_type=StringType(),
    input_types=[StringType()],
    packages=["pandas"]
)

# Call procedure using session.call()
result = session.call("process_sales_data", "US")
print(result)  # "Processed 15000 rows for US"
# Example 5: Stored procedure with transaction management
from snowflake.snowpark import Session

def transfer_inventory(session: Session, source_store: str, dest_store: str, product_id: str, quantity: int) -> str:
    """
    Transfer inventory between stores with transaction safety.
    
    Uses Snowflake transactions to ensure atomicity:
    - If both operations succeed, changes are committed
    - If either operation fails, all changes are rolled back
    """
    try:
        # Read source inventory
        source_df = session.table("inventory").filter(
            (col("store_id") == source_store) & 
            (col("product_id") == product_id)
        )
        
        source_record = source_df.collect()
        if not source_record:
            return f"Error: Product {product_id} not found in store {source_store}"
        
        current_qty = source_record[0]["QUANTITY"]
        if current_qty < quantity:
            return f"Error: Insufficient inventory ({current_qty} available, {quantity} requested)"
        
        # Deduct from source
        session.sql(f"""
            UPDATE inventory 
            SET quantity = quantity - {quantity},
                last_updated = CURRENT_TIMESTAMP()
            WHERE store_id = '{source_store}' 
            AND product_id = '{product_id}'
        """).collect()
        
        # Add to destination (upsert)
        session.sql(f"""
            MERGE INTO inventory t
            USING (SELECT '{dest_store}' AS store_id, '{product_id}' AS product_id, {quantity} AS quantity) s
            ON t.store_id = s.store_id AND t.product_id = s.product_id
            WHEN MATCHED THEN UPDATE SET quantity = t.quantity + s.quantity
            WHEN NOT MATCHED THEN INSERT (store_id, product_id, quantity)
            VALUES (s.store_id, s.product_id, s.quantity)
        """).collect()
        
        return f"Transferred {quantity} units of {product_id} from {source_store} to {dest_store}"
        
    except Exception as e:
        # Transaction rollback happens automatically on exception
        return f"Transfer failed: {str(e)}"

session.sproc.register(
    transfer_inventory,
    return_type=StringType(),
    input_types=[StringType(), StringType(), StringType(), IntegerType()]
)

Advanced DataFrame Operations

# Example 6: Advanced DataFrame transformations
from snowflake.snowpark.functions import when, lit, col, concat, to_date, datediff

# Complex transformations with conditional logic
df = session.table("customers") \
    .with_column("age_group",
        when(col("age") < 25, lit("Young"))
        .when(col("age") < 50, lit("Adult"))
        .otherwise(lit("Senior"))
    ) \
    .with_column("full_name",
        concat(col("first_name"), lit(" "), col("last_name"))
    ) \
    .with_column("membership_years",
        datediff("year", col("join_date"), lit("2024-01-01"))
    ) \
    .select("customer_id", "full_name", "age_group", "email", "membership_years")

# Write to table with partitioning
df.write.mode("overwrite").save_as_table("customer_segments")

# Window functions for analytics
from snowflake.snowpark import Window

window_spec = Window.partition_by("region").order_by(col("sales_amount").desc())

ranked_df = session.table("sales") \
    .with_column("rank", rank().over(window_spec)) \
    .with_column("running_total", sum_(col("sales_amount")).over(
        Window.partition_by("region").order_by("sale_date")
    ))

# Join operations
customers_df = session.table("customers")
orders_df = session.table("orders")

joined_df = orders_df.join(
    customers_df,
    orders_df["customer_id"] == customers_df["customer_id"],
    how="left"  # "inner", "left", "right", "outer", "cross"
)

# Deduplication
deduped_df = orders_df.drop_duplicates(["order_id"])
# Or keep first occurrence based on a sort
deduped_df = orders_df.order_by(col("created_at").desc()).drop_duplicates(["customer_id"])

Error Handling and Production Patterns

# Example 7: Robust ETL with error handling and logging
from snowflake.snowpark import Session
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def robust_etl(session: Session, source_table: str, target_table: str) -> str:
    """
    Production-grade ETL with data quality validation and error handling.
    
    Features:
    - Null detection and handling
    - Duplicate removal
    - Row count validation
    - Detailed logging for debugging
    """
    try:
        # Read source data
        source_df = session.table(source_table)
        
        # Data quality check: count nulls in key columns
        null_count = source_df.filter(col("id").isNull()).count()
        if null_count > 0:
            logger.warning(f"Found {null_count} null IDs in {source_table}")
        
        # Data quality check: count duplicates
        total_rows = source_df.count()
        unique_rows = source_df.drop_duplicates(["id"]).count()
        duplicate_count = total_rows - unique_rows
        if duplicate_count > 0:
            logger.warning(f"Found {duplicate_count} duplicate rows")
        
        # Clean data
        clean_df = source_df.filter(col("id").is_not_null()) \
                           .drop_duplicates(["id"])
        
        # Write results
        clean_df.write.mode("overwrite").save_as_table(target_table)
        
        final_count = clean_df.count()
        logger.info(f"ETL complete: {final_count} rows written to {target_table}")
        
        return f"Success: Processed {final_count} rows (removed {null_count} nulls, {duplicate_count} duplicates)"
        
    except Exception as e:
        logger.error(f"ETL failed: {str(e)}")
        raise

# Example 8: DataFrame caching for repeated access
# cache() stores intermediate results to avoid re-execution
def analyze_sales(session: Session) -> dict:
    """Analyze sales data with caching for efficiency."""
    base_df = session.table("sales_data")
    
    # Cache base filtered data (avoids re-filtering for multiple aggregations)
    filtered_df = base_df.filter(col("order_date") >= "2024-01-01").cache()
    
    # Multiple aggregations use cached data
    total_sales = filtered_df.agg(sum_(col("amount")).alias("total")).collect()[0]["TOTAL"]
    avg_order = filtered_df.agg(avg(col("amount")).alias("avg")).collect()[0]["AVG"]
    order_count = filtered_df.count()
    
    # Cleanup cache
    filtered_df.unpersist()
    
    return {
        "total_sales": total_sales,
        "avg_order_value": avg_order,
        "order_count": order_count
    }

# Example 9: Reading semi-structured data
# Snowpark handles VARIANT, OBJECT, ARRAY types natively
json_df = session.table("json_events") \
    .select(
        col("event_id"),
        col("payload")["event_type"].as_("event_type"),    # Extract from OBJECT
        col("payload")["timestamp"].as_("timestamp"),
        col("payload")["data"].as_("event_data")
    )

# Example 10: Writing to stages for external systems
# copy_into stage for exporting data
session.table("analytics_output") \
    .write \
    .copy_into("@my_stage/output/") \
    .option("FILE_FORMAT", "PARQUET") \
    .option("HEADER", True) \
    .option("COMPRESSION", "SNAPPY") \
    .collect()

Performance Metrics

MetricTargetWarningCritical
DataFrame Translation Time< 1s1-5s> 5s
UDF Execution Time< 100ms100-500ms> 500ms
Stored Procedure Time< 30s30-120s> 120s
Data Transfer Latency< 1s1-10s> 10s

Best Practices

  1. Optimize DataFrame operations: Use predicate pushdown, column pruning, and partitioning to minimize data movement.

  2. Keep UDFs simple: Avoid expensive operations in UDFs. Use batch processing for large datasets.

  3. Manage sessions properly: Use appropriate warehouse sizes and implement connection pooling for production workloads.

  4. Handle errors gracefully: Implement try-except blocks and logging in stored procedures.

  5. Test with representative data: Validate performance and correctness with data volumes similar to production.

  6. Use appropriate return types: Choose the most efficient return type for your use case.

  7. Leverage built-in functions: Use Snowpark's built-in functions instead of custom UDFs when possible.

  8. Monitor execution metrics: Track DataFrame translation times, UDF execution times, and data transfer volumes.

  9. Implement transaction management: Use transactions in stored procedures for data consistency.

  10. Document code thoroughly: Include comments explaining complex logic and data transformations.


Additional Theory: Lazy Evaluation in Snowpark

Snowpark DataFrames use lazy evaluation β€” operations build a computation plan but do not execute until an action is invoked. This is identical to Apache Spark's model and provides several benefits:

  1. Query optimization: The planner can reorder operations, push predicates down, and eliminate unnecessary columns before execution.
  2. Reduced data transfer: Only the final result is returned to the client, not intermediate results.
  3. Batch processing: Multiple operations are fused into a single SQL execution plan.

Actions that trigger execution:

  • collect() β€” Returns all rows to the client
  • count() β€” Returns the total row count
  • show() β€” Prints formatted output
  • write.mode().save_as_table() β€” Writes data to a table
  • to_pandas() β€” Converts to Pandas DataFrame (client-side)

Transformations that are lazy:

  • filter(), select(), group_by(), agg(), join(), with_column(), order_by()

Additional Theory: UDF Performance Considerations

FactorImpactRecommendation
Python runtime overhead~1-10ms per rowUse batch processing for large datasets
Package importsCold start costKeep imports minimal; pre-package dependencies
SerializationJSON encoding/decodingMinimize data transfer between SQL and Python
MemoryLimited by warehouse nodeAvoid large in-UDF state; use external tables
Vectorized execution1.5-3Γ— fasterPrefer numeric operations that benefit from vectorization

See Also

⭐

Premium Content

Snowpark Python: UDFs, Stored Procedures & DataFrames

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert Snowflake Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement