Snowpark Python: UDFs, Stored Procedures & DataFrames
Architecture Diagram 1: Snowpark Architecture
Architecture Diagram 2: UDF Architecture
Architecture Diagram 3: Stored Procedures Architecture
Architecture Diagram 4: Snowpark DataFrame Lazy Evaluation
A Snowpark DataFrame is a lazy evaluation abstraction that represents a computation plan in Snowpark. Operations (select, filter, join, groupBy) build a computation graph without executing. Execution occurs only when an action is invoked (collect, count, write), and the optimized plan is translated into SQL executed on Snowflake warehouses.
A Snowpark Session is the entry point for all Snowpark operations. It manages connection to Snowflake, handles authentication, and provides methods to create DataFrames, register UDFs, and manage transactions. Each session represents a single authenticated connection.
UDF Execution Model
- Python UDF: Interpreted per-row, overhead ~1-10ms/row; best for lightweight transforms
- Python Stored Proc: Pre-compiled, lower overhead; best for batch operations
- Batch execution: Group rows into batches of 100-1000 for UDF processing
- Memory: UDFs have access to Snowflake warehouse memory; avoid large state
- Performance: Vectorized execution (1.5-3Γ faster) available for numeric operations
Use Snowpark for: ETL pipelines (DataFrame transformations), ML feature engineering, data quality validation, and complex data processing. Prefer stored procedures over UDFs for batch operations. Use cache() for frequently accessed DataFrames.
- Lazy evaluation: Operations build plans; execution only on actions
- SQL translation: DataFrame operations compile to optimized SQL
- Multi-language: Python, Java, Scala support with uniform DataFrame API
- ML integration: scikit-learn, XGBoost, PyTorch via Snowpark ML
- Cost model: Same as regular queries β warehouse credits Γ execution time
Detailed Explanation
What is Snowpark?
Snowflake's developer framework for writing data-intensive applications in Python, Java, and Scala β executing entirely within Snowflake.
Core Abstraction: DataFrames
- Lazy evaluation β operations build computation plans, execute only on actions
- Actions:
collect(),count(),show(),write.mode().save_as_table() - Transformations:
filter(),select(),group_by(),agg(),join() - Automatic SQL translation with predicate pushdown and column pruning
UDF Implementation Patterns
| Type | Input β Output | Use Case |
|---|---|---|
| Scalar UDF | 1 row β 1 value | Data transformations |
| Table UDF (UDTF) | 1 row β N rows | Data expansion |
| Aggregate UDF | N rows β 1 value | Custom aggregations |
- Keep UDF logic simple; avoid external dependencies
- Batch processing reduces per-row overhead
- Vectorized execution (1.5β3Γ faster) for numeric operations
Stored Procedures for Complex Workflows
- Access
Sessionobject for dynamic SQL, data read/write - Transaction management for data consistency
- Useful for ETL, data quality checks, complex business logic
Session Management
- Entry point for all Snowpark operations
- Configurable: account, warehouse, database/schema context
- Production: use dedicated service accounts, connection pooling
Best Practices
- Optimize DataFrame operations with predicate pushdown and column pruning
- Keep UDFs simple; use batch processing for large datasets
- Manage sessions properly with appropriate warehouse sizes
- Handle errors with try-except blocks and logging
- Test with representative data volumes
Key Takeaway: Prefer Snowpark over Spark for transformations that can execute within Snowflake to minimize data movement.
Key Concepts Table
| Component | Purpose | Execution | Use Case |
|---|---|---|---|
| DataFrame | Distributed dataset | SQL translation | Data manipulation |
| UDF | Custom function | Row/batch processing | Data transformation |
| UDTF | Table function | Row processing | Data expansion |
| Stored Procedure | Complex workflow | Full Python runtime | ETL, business logic |
| Return Type | Usage | Example |
|---|---|---|
| StringType | Text results | "Processed 100 rows" |
| IntegerType | Numeric results | 42 |
| FloatType | Decimal results | 3.14 |
| BooleanType | True/false | True |
| PandasDataFrame | Result sets | df.to_pandas() |
| Session Method | Purpose | Example |
|---|---|---|
| table() | Read table | session.table("sales") |
| sql() | Execute SQL | session.sql("SELECT 1") |
| write | Write data | df.write.save_as_table() |
| use_database() | Set context | session.use_database("db") |
Code Examples
Session Management and DataFrame Basics
# Example 1: Basic DataFrame operations
# Session is the entry point for all Snowpark operations
# It manages connection, authentication, and transaction context
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col, sum as sum_, avg, count, lit
# Create session with explicit configuration
# Each config option controls connection behavior:
# account: Snowflake account identifier
# user: Database user for authentication
# password: Authentication credential
# role: RBAC role controlling access permissions
# warehouse: Compute resource for query execution
# database/schema: Default context for unqualified object names
session = Session.builder.configs({
"account": "your_account",
"user": "your_user",
"password": "your_password",
"role": "ANALYTICS_ROLE",
"warehouse": "ANALYTICS_WH",
"database": "ANALYTICS_DB",
"schema": "PUBLIC"
}).create()
# Read table into DataFrame (lazy evaluation - no execution yet)
df = session.table("sales_data")
# Filter: adds WHERE clause to the query plan
# group_by: adds GROUP BY clause
# agg: applies aggregate functions
# collect(): triggers execution and returns results to client
result = df.filter(col("amount") > 1000) \
.group_by("region") \
.agg(
sum_(col("amount")).alias("total_amount"),
avg(col("amount")).alias("avg_amount"),
count("*").alias("row_count")
) \
.order_by(col("total_amount").desc()) \
.collect()
# Show results
for row in result:
print(f"Region: {row['REGION']}, Total: {row['TOTAL_AMOUNT']}, Avg: {row['AVG_AMOUNT']}")
# Alternative: show() prints formatted output without returning to client
df.filter(col("amount") > 1000).group_by("region").agg(sum_(col("amount")).alias("total")).show()
Scalar and Table UDFs
# Example 2: Scalar UDF
# Scalar UDFs process one row at a time, returning a single value
# Used in SELECT, WHERE, and CASE expressions
from snowflake.snowpark.types import FloatType, StringType
from snowflake.snowpark.udf import udf
# @udf decorator registers the function as a Snowflake UDF
# Parameters:
# name: SQL function name for invocation
# return_type: Data type of the return value
# input_types: List of input parameter types
# packages: Python packages available in the UDF runtime
@udf(name="calculate_tax",
return_type=FloatType(),
input_types=[FloatType(), FloatType()],
packages=["pandas"])
def calculate_tax(amount: float, tax_rate: float) -> float:
"""Calculate tax amount given a price and tax rate percentage."""
return amount * (tax_rate / 100)
# Register UDF globally for SQL access
session.udf.register(calculate_tax)
# Use UDF in SQL query
result = session.sql("""
SELECT product, amount,
calculate_tax(amount, 0.08) as tax,
amount + calculate_tax(amount, 0.08) as total_with_tax
FROM sales
""").collect()
# Example 3: Table UDF (UDTF)
# UDTFs process one row and can return multiple rows (1-to-many expansion)
# Used with LATERAL JOIN for data expansion
from snowflake.snowpark.types import StructType, StructField, StringType, IntegerType
from snowflake.snowpark.udtf import udtf
# output_schema defines the structure of returned rows
# input_types defines the input parameters
@udtf(name="parse_json_array",
output_schema=StructType([
StructField("key", StringType()),
StructField("value", StringType())
]),
input_types=[StringType()])
def parse_json_array(json_str: str):
"""Parse a JSON object and return key-value pairs as rows."""
import json
data = json.loads(json_str)
for key, value in data.items():
yield (key, str(value)) # yield produces rows (generator pattern)
# Register and use UDTF
session.udtf.register(parse_json_array)
result = session.sql("""
SELECT *
FROM TABLE(parse_json_array('{"a": 1, "b": 2, "c": 3}'))
""").collect()
# UDTF with multiple input columns
@udtf(name="split_string",
output_schema=StructType([
StructField("token", StringType()),
StructField("position", IntegerType())
]),
input_types=[StringType(), StringType()])
def split_string(text: str, delimiter: str):
"""Split a string by delimiter and return tokens with positions."""
tokens = text.split(delimiter)
for i, token in enumerate(tokens):
yield (token, i)
session.udtf.register(split_string)
result = session.sql("""
SELECT original_text, token, position
FROM TABLE(split_string('hello-world-snowflake', '-'))
""").collect()
Stored Procedures for Complex Workflows
# Example 4: Stored procedure with full parameter documentation
from snowflake.snowpark import Session
def process_sales_data(session: Session, region: str) -> str:
"""
Process sales data for a specific region.
Args:
session: Snowpark session for data access
region: Geographic region code (e.g., 'US', 'EU', 'APAC')
Returns:
Summary string with processing statistics
"""
# Read data using session.table() - returns lazy DataFrame
df = session.table("raw_sales")
# Filter: WHERE clause pushed down to Snowflake
filtered = df.filter(col("region") == region.upper())
# Aggregate: GROUP BY with SUM
aggregated = filtered.group_by("product") \
.agg(sum_(col("amount")).alias("total"))
# Write results using save_as_table()
# mode="overwrite": TRUNCATE + INSERT (replaces entire table)
# mode="append": INSERT INTO (adds to existing data)
aggregated.write.mode("overwrite") \
.save_as_table(f"sales_{region.lower()}")
# Count triggers execution; returns integer
row_count = filtered.count()
return f"Processed {row_count} rows for {region}"
# Register procedure with explicit type declarations
# return_type: Python return type -> Snowflake type mapping
# input_types: Python parameter types -> Snowflake type mapping
# packages: Available Python packages in the stored proc runtime
session.sproc.register(
process_sales_data,
return_type=StringType(),
input_types=[StringType()],
packages=["pandas"]
)
# Call procedure using session.call()
result = session.call("process_sales_data", "US")
print(result) # "Processed 15000 rows for US"
# Example 5: Stored procedure with transaction management
from snowflake.snowpark import Session
def transfer_inventory(session: Session, source_store: str, dest_store: str, product_id: str, quantity: int) -> str:
"""
Transfer inventory between stores with transaction safety.
Uses Snowflake transactions to ensure atomicity:
- If both operations succeed, changes are committed
- If either operation fails, all changes are rolled back
"""
try:
# Read source inventory
source_df = session.table("inventory").filter(
(col("store_id") == source_store) &
(col("product_id") == product_id)
)
source_record = source_df.collect()
if not source_record:
return f"Error: Product {product_id} not found in store {source_store}"
current_qty = source_record[0]["QUANTITY"]
if current_qty < quantity:
return f"Error: Insufficient inventory ({current_qty} available, {quantity} requested)"
# Deduct from source
session.sql(f"""
UPDATE inventory
SET quantity = quantity - {quantity},
last_updated = CURRENT_TIMESTAMP()
WHERE store_id = '{source_store}'
AND product_id = '{product_id}'
""").collect()
# Add to destination (upsert)
session.sql(f"""
MERGE INTO inventory t
USING (SELECT '{dest_store}' AS store_id, '{product_id}' AS product_id, {quantity} AS quantity) s
ON t.store_id = s.store_id AND t.product_id = s.product_id
WHEN MATCHED THEN UPDATE SET quantity = t.quantity + s.quantity
WHEN NOT MATCHED THEN INSERT (store_id, product_id, quantity)
VALUES (s.store_id, s.product_id, s.quantity)
""").collect()
return f"Transferred {quantity} units of {product_id} from {source_store} to {dest_store}"
except Exception as e:
# Transaction rollback happens automatically on exception
return f"Transfer failed: {str(e)}"
session.sproc.register(
transfer_inventory,
return_type=StringType(),
input_types=[StringType(), StringType(), StringType(), IntegerType()]
)
Advanced DataFrame Operations
# Example 6: Advanced DataFrame transformations
from snowflake.snowpark.functions import when, lit, col, concat, to_date, datediff
# Complex transformations with conditional logic
df = session.table("customers") \
.with_column("age_group",
when(col("age") < 25, lit("Young"))
.when(col("age") < 50, lit("Adult"))
.otherwise(lit("Senior"))
) \
.with_column("full_name",
concat(col("first_name"), lit(" "), col("last_name"))
) \
.with_column("membership_years",
datediff("year", col("join_date"), lit("2024-01-01"))
) \
.select("customer_id", "full_name", "age_group", "email", "membership_years")
# Write to table with partitioning
df.write.mode("overwrite").save_as_table("customer_segments")
# Window functions for analytics
from snowflake.snowpark import Window
window_spec = Window.partition_by("region").order_by(col("sales_amount").desc())
ranked_df = session.table("sales") \
.with_column("rank", rank().over(window_spec)) \
.with_column("running_total", sum_(col("sales_amount")).over(
Window.partition_by("region").order_by("sale_date")
))
# Join operations
customers_df = session.table("customers")
orders_df = session.table("orders")
joined_df = orders_df.join(
customers_df,
orders_df["customer_id"] == customers_df["customer_id"],
how="left" # "inner", "left", "right", "outer", "cross"
)
# Deduplication
deduped_df = orders_df.drop_duplicates(["order_id"])
# Or keep first occurrence based on a sort
deduped_df = orders_df.order_by(col("created_at").desc()).drop_duplicates(["customer_id"])
Error Handling and Production Patterns
# Example 7: Robust ETL with error handling and logging
from snowflake.snowpark import Session
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def robust_etl(session: Session, source_table: str, target_table: str) -> str:
"""
Production-grade ETL with data quality validation and error handling.
Features:
- Null detection and handling
- Duplicate removal
- Row count validation
- Detailed logging for debugging
"""
try:
# Read source data
source_df = session.table(source_table)
# Data quality check: count nulls in key columns
null_count = source_df.filter(col("id").isNull()).count()
if null_count > 0:
logger.warning(f"Found {null_count} null IDs in {source_table}")
# Data quality check: count duplicates
total_rows = source_df.count()
unique_rows = source_df.drop_duplicates(["id"]).count()
duplicate_count = total_rows - unique_rows
if duplicate_count > 0:
logger.warning(f"Found {duplicate_count} duplicate rows")
# Clean data
clean_df = source_df.filter(col("id").is_not_null()) \
.drop_duplicates(["id"])
# Write results
clean_df.write.mode("overwrite").save_as_table(target_table)
final_count = clean_df.count()
logger.info(f"ETL complete: {final_count} rows written to {target_table}")
return f"Success: Processed {final_count} rows (removed {null_count} nulls, {duplicate_count} duplicates)"
except Exception as e:
logger.error(f"ETL failed: {str(e)}")
raise
# Example 8: DataFrame caching for repeated access
# cache() stores intermediate results to avoid re-execution
def analyze_sales(session: Session) -> dict:
"""Analyze sales data with caching for efficiency."""
base_df = session.table("sales_data")
# Cache base filtered data (avoids re-filtering for multiple aggregations)
filtered_df = base_df.filter(col("order_date") >= "2024-01-01").cache()
# Multiple aggregations use cached data
total_sales = filtered_df.agg(sum_(col("amount")).alias("total")).collect()[0]["TOTAL"]
avg_order = filtered_df.agg(avg(col("amount")).alias("avg")).collect()[0]["AVG"]
order_count = filtered_df.count()
# Cleanup cache
filtered_df.unpersist()
return {
"total_sales": total_sales,
"avg_order_value": avg_order,
"order_count": order_count
}
# Example 9: Reading semi-structured data
# Snowpark handles VARIANT, OBJECT, ARRAY types natively
json_df = session.table("json_events") \
.select(
col("event_id"),
col("payload")["event_type"].as_("event_type"), # Extract from OBJECT
col("payload")["timestamp"].as_("timestamp"),
col("payload")["data"].as_("event_data")
)
# Example 10: Writing to stages for external systems
# copy_into stage for exporting data
session.table("analytics_output") \
.write \
.copy_into("@my_stage/output/") \
.option("FILE_FORMAT", "PARQUET") \
.option("HEADER", True) \
.option("COMPRESSION", "SNAPPY") \
.collect()
Performance Metrics
| Metric | Target | Warning | Critical |
|---|---|---|---|
| DataFrame Translation Time | < 1s | 1-5s | > 5s |
| UDF Execution Time | < 100ms | 100-500ms | > 500ms |
| Stored Procedure Time | < 30s | 30-120s | > 120s |
| Data Transfer Latency | < 1s | 1-10s | > 10s |
Best Practices
-
Optimize DataFrame operations: Use predicate pushdown, column pruning, and partitioning to minimize data movement.
-
Keep UDFs simple: Avoid expensive operations in UDFs. Use batch processing for large datasets.
-
Manage sessions properly: Use appropriate warehouse sizes and implement connection pooling for production workloads.
-
Handle errors gracefully: Implement try-except blocks and logging in stored procedures.
-
Test with representative data: Validate performance and correctness with data volumes similar to production.
-
Use appropriate return types: Choose the most efficient return type for your use case.
-
Leverage built-in functions: Use Snowpark's built-in functions instead of custom UDFs when possible.
-
Monitor execution metrics: Track DataFrame translation times, UDF execution times, and data transfer volumes.
-
Implement transaction management: Use transactions in stored procedures for data consistency.
-
Document code thoroughly: Include comments explaining complex logic and data transformations.
Additional Theory: Lazy Evaluation in Snowpark
Snowpark DataFrames use lazy evaluation β operations build a computation plan but do not execute until an action is invoked. This is identical to Apache Spark's model and provides several benefits:
- Query optimization: The planner can reorder operations, push predicates down, and eliminate unnecessary columns before execution.
- Reduced data transfer: Only the final result is returned to the client, not intermediate results.
- Batch processing: Multiple operations are fused into a single SQL execution plan.
Actions that trigger execution:
collect()β Returns all rows to the clientcount()β Returns the total row countshow()β Prints formatted outputwrite.mode().save_as_table()β Writes data to a tableto_pandas()β Converts to Pandas DataFrame (client-side)
Transformations that are lazy:
filter(),select(),group_by(),agg(),join(),with_column(),order_by()
Additional Theory: UDF Performance Considerations
| Factor | Impact | Recommendation |
|---|---|---|
| Python runtime overhead | ~1-10ms per row | Use batch processing for large datasets |
| Package imports | Cold start cost | Keep imports minimal; pre-package dependencies |
| Serialization | JSON encoding/decoding | Minimize data transfer between SQL and Python |
| Memory | Limited by warehouse node | Avoid large in-UDF state; use external tables |
| Vectorized execution | 1.5-3Γ faster | Prefer numeric operations that benefit from vectorization |
See Also
- 12-Cortex-AI - ML model serving via Snowpark
- 15-Cross-Platform-Integration - Spark integration patterns
- 03-Warehouses-and-Auto-Suspend - Warehouse sizing for Snowpark workloads
- PySpark Iceberg - PySpark DataFrame patterns
- Delta Lake on Databricks - Delta Lake DataFrame API
- Data Warehouse Concepts - Data warehouse design principles