Snowflake Snowpark for Python
The Snowpark library provides intuitive APIs for querying and processing data in a data pipeline. Using this library, you can build applications that process data in Snowflake without having to move data to the system where your application code runs.
Source code | Developer guide | API reference | Product documentation | Samples
If you don't have a Snowflake account yet, you can sign up for a 30-day free trial account.
You can use miniconda, anaconda, or virtualenv to create a Python 3.8, 3.9, 3.10 or 3.11 virtual environment.
To have the best experience when using it with UDFs, creating a local conda environment with the Snowflake channel is recommended.
pip install snowflake-snowpark-python
Optionally, you need to install pandas in the same environment if you want to use pandas-related features:
pip install "snowflake-snowpark-python[pandas]"
from snowflake.snowpark import Session
connection_parameters = {
"account": "<your snowflake account>",
"user": "<your snowflake user>",
"password": "<your snowflake password>",
"role": "<snowflake user role>",
"warehouse": "<snowflake warehouse>",
"database": "<snowflake database>",
"schema": "<snowflake schema>"
}
session = Session.builder.configs(connection_parameters).create()
df = session.create_dataframe([[1, 2], [3, 4]], schema=["a", "b"])
df = df.filter(df.a > 1)
df.show()
pandas_df = df.to_pandas() # this requires pandas installed in the Python environment
result = df.collect()
The Developer Guide and API references have basic sample code. Snowflake-Labs has more curated demos.
Configure logging level for snowflake.snowpark for Snowpark Python API logs.
Snowpark uses the Snowflake Python Connector.
So you may also want to configure the logging level for snowflake.connector when the error is in the Python Connector.
For instance,
import logging
for logger_name in ('snowflake.snowpark', 'snowflake.connector'):
logger = logging.getLogger(logger_name)
logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch.setFormatter(logging.Formatter('%(asctime)s - %(threadName)s %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s - %(message)s'))
logger.addHandler(ch)
Please refer to CONTRIBUTING.md.
Add the conn_error attribute to SnowflakeSQLException that stores the whole underlying exception from snowflake-connector-python.
Added support for RelationalGroupedDataframe.pivot() to access pivot in the following pattern Dataframe.group_by(...).pivot(...).
Added experimental feature: Local Testing Mode, which allows you to create and operate on Snowpark Python DataFrames locally without connecting to a Snowflake account. You can use the local testing framework to test your DataFrame operations locally, on your development machine or in a CI (continuous integration) pipeline, before deploying code changes to your account.
Added support for arrays_to_object new functions in snowflake.snowpark.functions.
Added support for the vector data type.
cloudpickle==2.2.1snowflake-connector-python to 3.4.0.session.read.with_metadata creates inconsistent table when doing df.write.save_as_table.DataFrame.to_local_iterator().input_names in UDTFRegistration.register/register_file and functions.pandas_udtf. By default, RelationalGroupedDataFrame.applyInPandas will infer the column names from current dataframe schema.sql_error_code and raw_message attributes to SnowflakeSQLException when it is caused by a SQL exception.DataFrame.to_pandas() where converting snowpark dataframes to pandas dataframes was losing precision on integers with more than 19 digits.session.add_packages can not handle requirement specifier that contains project name with underscore and version.DataFrame.limit() when offset is used and the parent DataFrame uses limit. Now the offset won't impact the parent DataFrame's limit.DataFrame.write.save_as_table where dataframes created from read api could not save data into snowflake because of invalid column name $1.date_format:
format argument changed from optional to required.normal, zipf, uniform, seq1, seq2, seq4, seq8) function is used, the sort and filter operation will no longer be flattened when generating the query.typing-extensions.Dataframe.writer.save_as_table which does not need insert permission for writing tables.PythonObjJSONEncoder json-serializable objects for ARRAY and OBJECT literals.Added support for VOLATILE/IMMUTABLE keyword when registering UDFs.
Added support for specifying clustering keys when saving dataframes using DataFrame.save_as_table.
Accept Iterable objects input for schema when creating dataframes using Session.create_dataframe.
Added the property DataFrame.session to return a Session object.
Added the property Session.session_id to return an integer that represents session ID.
Added the property Session.connection to return a SnowflakeConnection object .
Added support for creating a Snowpark session from a configuration file or environment variables.
snowflake-connector-python to 3.2.0.ValueError even when compatible package version were added in session.add_packages.register_from_file.invalid_identifier error.DataFrame.copy disables SQL simplfier for the returned copy.session.sql().select() would fail if any parameters are specified to session.sql()external_access_integrations and secrets when creating a UDF, UDTF or Stored Procedure from Snowpark Python to allow integration with external access.snowflake.snowpark.functions:
array_flattenflattenapply_in_pandas in snowflake.snowpark.relational_grouped_dataframe.Session.replicate_local_environment.session.create_dataframe fails to properly set nullable columns where nullability was affected by order or data was given.DataFrame.select could not identify and alias columns in presence of table functions when output columns of table function overlapped with columns in dataframe.is_permanent=False will now create temporary objects even when stage_name is provided. The default value of is_permanent is False which is why if this value is not explicitly set to True for permanent objects, users will notice a change in behavior.types.StructField now enquotes column identifier by default.snowflake.snowpark.functions:
array_sortsort_arrayarray_minarray_maxexplode_outerSession.add_requirements or Session.add_packages. They are now usable in stored procedures and UDFs even if packages are not present on the Snowflake Anaconda channel.
custom_packages_upload_enabled and custom_packages_force_upload_enabled to enable the support for pure Python packages feature mentioned above. Both parameters default to False.Session.add_requirements.DataFrame.rename.params in session.sql() in stored procedures.TIMESTAMP_NTZ, TIMESTAMP_LTZ, TIMESTAMP_TZ)
TimestampTimezone as an argument in TimestampType constructor.NTZ, LTZ, TZ and Timestamp to annotate functions when registering UDFs.typing-extensions.DataFrame.cache_result now creates temp table fully qualified names under current database and current schema.numpy.ufunc.DataFrame.union was not generating the correct Selectable.schema_query when SQL simplifier is enabled.DataFrameWriter.save_as_table now respects the nullable field of the schema provided by the user or the inferred schema based on data from user input.snowflake-connector-python to 3.0.4.DataFrame.agg and DataFrame.describe, no longer strip away non-printing characters from column names.snowflake.snowpark.functions:
array_generate_rangearray_unique_aggcollect_setsequenceTABLE return type.length in StringType() to specify the maximum number of characters that can be stored by the column.functions.element_at() for functions.get().Column.contains for functions.contains.DataFrame.alias.DataFrame using DataFrameReader.StructType.add to append more fields to existing StructType objects.execute_as in StoredProcedureRegistration.register_from_file() to specify stored procedure caller rights.Dataframe.join_table_function did not run all of the necessary queries to set up the join table function when SQL simplifier was enabled.ColumnOrName, ColumnOrLiteralStr, ColumnOrSqlExpr, LiteralType and ColumnOrLiteral that were breaking mypy checks.DataFrameWriter.save_as_table and DataFrame.copy_into_table failed to parse fully qualified table names.session.getOrCreate.Column.getField.snowflake.snowpark.functions:
date_add and date_sub to make add and subtract operations easier.daydiffexplodearray_distinct.regexp_extract.struct.format_number.bround.substring_indexskip_upload_on_content_match when creating UDFs, UDTFs and stored procedures using register_from_file to skip uploading files to a stage if the same version of the files are already on the stage.DataFrameWriter.save_as_table method to take table names that contain dots.DataFrame.filter() or DataFrame.order_by() is followed by a projection statement (e.g. DataFrame.select(), DataFrame.with_column()).Dataframe.create_or_replace_dynamic_table.params in session.sql() to support binding variables. Note that this is not supported in stored procedures yet.strtok_to_array where an exception was thrown when a delimiter was passed in.session.add_import where the module had the same namespace as other dependencies.delimiters parameter in functions.initcap().functions.hash() to accept a variable number of input expressions.Session.RuntimeConfig for getting/setting/checking the mutability of any runtime configuration.Row results from DataFrame.collect using case_sensitive parameter.Session.conf for getting, setting or checking the mutability of any runtime configuration.Row results from DataFrame.collect using case_sensitive parameter.snowflake.snowpark.types.StructType.log_on_exception to Dataframe.collect and Dataframe.collect_no_wait to optionally disable error logging for SQL exceptions.DataFrame.substract, DataFrame.union, etc.) being called after another DataFrame set operation and DataFrame.select or DataFrame.with_column throws an exception.SNOWPARK_LEFT, SNOWPARK_RIGHT) by default. Users can disable this at runtime with session.conf.set('use_constant_subquery_alias', False) to use randomly generated alias names instead.session.call().source_code_display=False at registration.if_not_exists when creating a UDF, UDTF or Stored Procedure from Snowpark Python to ignore creating the specified function or procedure if it already exists.snowflake.snowpark.functions.get to extract value from array.functions.reverse in functions to open access to Snowflake built-in function
reverse.require_scoped_url in snowflake.snowflake.files.SnowflakeFile.open() (in Private Preview) to replace is_owner_file is marked for deprecation.paramstyle to qmark when creating a Snowpark session.df.join(..., how="cross") fails with SnowparkJoinException: (1112): Unsupported using join type 'Cross'.DataFrame column created from chained function calls used a wrong column name.asc, asc_nulls_first, asc_nulls_last, desc, desc_nulls_first, desc_nulls_last, date_part and unix_timestamp in functions.DataFrame.dtypes to return a list of column name and data type pairs.functions.expr() for functions.sql_expr().functions.date_format() for functions.to_date().functions.monotonically_increasing_id() for functions.seq8()functions.from_unixtime() for functions.to_timestamp()PYTHON_SNOWPARK_USE_SQL_SIMPLIFIER is True after Snowflake 7.3 was released. In snowpark-python, session.sql_simplifier_enabled reads the value of PYTHON_SNOWPARK_USE_SQL_SIMPLIFIER by default, meaning that the SQL simplfier is enabled by default after the Snowflake 7.3 release. To turn this off, set PYTHON_SNOWPARK_USE_SQL_SIMPLIFIER in Snowflake to False or run session.sql_simplifier_enabled = False from Snowpark. It is recommended to use the SQL simplifier because it helps to generate more concise SQL.Session.generator() to create a new DataFrame using the Generator table function.secure to the functions that create a secure UDF or UDTF.Session.create_async_job() to create an AsyncJob instance from a query id.AsyncJob.result() now accepts argument result_type to return the results in different formats.AsyncJob.to_df() returns a DataFrame built from the result of this asynchronous job.AsyncJob.query() returns the SQL text of the executed query.DataFrame.agg() and RelationalGroupedDataFrame.agg() now accept variable-length arguments.lsuffix and rsuffix to DataFram.join() and DataFrame.cross_join() to conveniently rename overlapping columns.Table.drop_table() so you can drop the temp table after DataFrame.cache_result(). Table is also a context manager so you can use the with statement to drop the cache temp table after use.Session.use_secondary_roles().first_value() and last_value(). (contributed by @chasleslr)on as an alias for using_columns and how as an alias for join_type in DataFrame.join().Session.create_dataframe() that raised an error when schema names had special characters.Session.read.option() were not passed to DataFrame.copy_into_table() as default values.DataFrame.copy_into_table() raises an error when a copy option has single quotes in the value.Session.add_packages() now raises ValueError when the version of a package cannot be found in Snowflake Anaconda channel. Previously, Session.add_packages() succeeded, and a SnowparkSQLException exception was raised later in the UDF/SP registration step.FileOperation.get_stream() to support downloading stage files as stream.functions.ntiles() to accept int argument.functions.call_function() for functions.call_builtin().functions.function() for functions.builtin().DataFrame.order_by() for DataFrame.sort()DataFrame.orderBy() for DataFrame.sort()DataFrame.cache_result() to return a more accurate Table class instead of a DataFrame class.session as the first argument when calling StoredProcedure.Session.sql_simplifier_enabled = True.DataFrame.select(), DataFrame.with_column(), DataFrame.drop() and other select-related APIs have more flattened SQLs.DataFrame.union(), DataFrame.union_all(), DataFrame.except_(), DataFrame.intersect(), DataFrame.union_by_name() have flattened SQLs generated when multiple set operators are chained.Table.update(), Table.delete(), Table.merge() try to reference a temp table that does not exist.block to the following action APIs on Snowpark dataframes (which execute queries) to allow asynchronous evaluations:
DataFrame.collect(), DataFrame.to_local_iterator(), DataFrame.to_pandas(), DataFrame.to_pandas_batches(), DataFrame.count(), DataFrame.first().DataFrameWriter.save_as_table(), DataFrameWriter.copy_into_location().Table.delete(), Table.update(), Table.merge().DataFrame.collect_nowait() to allow asynchronous evaluations.AsyncJob to retrieve results from asynchronously executed queries and check their status.table_type in Session.write_pandas(). You can now choose from these table_type options: "temporary", "temp", and "transient".list, tuple and dict) as literal values in Snowpark.execute_as to functions.sproc() and session.sproc.register() to allow registering a stored procedure as a caller or owner.DataFrame.copy_into_table() and DataFrameWriter.save_as_table() mistakenly created a new table if the table name is fully qualified, and the table already exists.create_temp_table in Session.write_pandas().snowflake-connector-python to 2.7.12.source_code_display as False when calling register() or @udf().DataFrame.select(), DataFrame.with_column() and DataFrame.with_columns() which now take parameters of type table_function.TableFunctionCall for columns.overwrite to session.write_pandas() to allow overwriting contents of a Snowflake table with that of a Pandas DataFrame.column_order to df.write.save_as_table() to specify the matching rules when inserting data into table in append mode.FileOperation.put_stream() to upload local files to a stage via file stream.TableFunctionCall.alias() and TableFunctionCall.as_() to allow aliasing the names of columns that come from the output of table function joins.get_active_session() in module snowflake.snowpark.context to get the current active Snowpark session.statement_params is not passed to the function.session.create_dataframe() is called with dicts and a given schema.df.write.save_as_table().function.uniform() to infer the types of inputs max_ and min_ and cast the limits to IntegerType or FloatType correspondingly.statement_params to the following methods to allow for specifying statement level parameters:
collect, to_local_iterator, to_pandas, to_pandas_batches,
count, copy_into_table, show, create_or_replace_view, create_or_replace_temp_view, first, cache_result
and random_split on class snowflake.snowpark.Dateframe.update, delete and merge on class snowflake.snowpark.Table.save_as_table and copy_into_location on class snowflake.snowpark.DataFrameWriter.approx_quantile, statement_params, cov and crosstab on class snowflake.snowpark.DataFrameStatFunctions.register and register_from_file on class snowflake.snowpark.udf.UDFRegistration.register and register_from_file on class snowflake.snowpark.udtf.UDTFRegistration.register and register_from_file on class snowflake.snowpark.stored_procedure.StoredProcedureRegistration.udf, udtf and sproc in snowflake.snowpark.functions.Column as an input argument to session.call().table_type in df.write.save_as_table(). You can now choose from these table_type options: "temporary", "temp", and "transient".session.use_* methods.session.create_dataframe().session.create_dataframe() mistakenly converted 0 and False to None when the input data was only a list.session.create_dataframe() using a large local dataset sometimes created a temp table twice.function.trim() with the SQL function definition.sum vs. the Snowpark function.sum().create_temp_table in df.write.save_as_table().snowflake.snowpark.functions.udtf() to register a UDTF, or use it as a decorator to register the UDTF.
Session.udtf.register() to register a UDTF.Session.udtf.register_from_file() to register a UDTF from a Python file.snowflake.snowpark.functions.table_function() to create a callable representing a table function and use it to call the table function in a query.snowflake.snowpark.functions.call_table_function() to call a table function.over clause that specifies partition by and order by when lateral joining a table function.Session.table_function() and DataFrame.join_table_function() to accept TableFunctionCall instances.functions.udf() and functions.sproc(), you can now specify an empty list for the imports or packages argument to indicate that no import or package is used for this UDF or stored procedure. Previously, specifying an empty list meant that the function would use session-level imports or packages.__repr__ implementation of data types in types.py. The unused type_name property has been removed.ProgrammingError from the Python connector.DataFrame.to_pandas().DataFrameReader.parquet() failed to read a parquet file when its column contained spaces.DataFrame.copy_into_table() failed when the dataframe is created by reading a file with inferred schemas.Session.flatten() and DataFrame.flatten().
cloudpickle <= 2.0.0.current_session(), current_statement(), current_user(), current_version(), current_warehouse(), date_from_parts(), date_trunc(), dayname(), dayofmonth(), dayofweek(), dayofyear(), grouping(), grouping_id(), hour(), last_day(), minute(), next_day(), previous_day(), second(), month(), monthname(), quarter(), year(), current_database(), current_role(), current_schema(), current_schemas(), current_region(), current_avaliable_roles(), add_months(), any_value(), bitnot(), bitshiftleft(), bitshiftright(), convert_timezone(), uniform(), strtok_to_array(), sysdate(), time_from_parts(), timestamp_from_parts(), timestamp_ltz_from_parts(), timestamp_ntz_from_parts(), timestamp_tz_from_parts(), weekofyear(), percentile_cont() to snowflake.snowflake.functions.DataFrame.groupByGroupingSets(), DataFrame.naturalJoin(), DataFrame.joinTableFunction, DataFrame.withColumns(), Session.getImports(), Session.addImport(), Session.removeImport(), Session.clearImports(), Session.getSessionStage(), Session.getDefaultDatabase(), Session.getDefaultSchema(), Session.getCurrentDatabase(), Session.getCurrentSchema(), Session.getFullyQualifiedCurrentSchema().DataFrame with a specific schema using the Session.create_dataframe() method.INFO to DEBUG for several logs (e.g., the executed query) when evaluating a dataframe.Session.create_dataframe() method.typing-extension as a new dependency with the version >= 4.1.0.Session.sproc property and sproc() to snowflake.snowpark.functions, so you can register stored procedures.Session.call to call stored procedures by name.UDFRegistration.register_from_file() to allow registering UDFs from Python source files or zip files directly.UDFRegistration.describe() to describe a UDF.DataFrame.random_split() to provide a way to randomly split a dataframe.md5(), sha1(), sha2(), ascii(), initcap(), length(), lower(), lpad(), ltrim(), rpad(), rtrim(), repeat(), soundex(), regexp_count(), replace(), charindex(), collate(), collation(), insert(), left(), right(), endswith() to snowflake.snowpark.functions.call_udf() to accept literal values.distinct keyword in array_agg().DataFrame.to_pandas() to have a string column if Column.cast(IntegerType()) was used.DataFrame.describe() when there is more than one string column.add_packages(), get_packages(), clear_packages(), and remove_package(), to class Session.add_requirements() to Session so you can use a requirements file to specify which packages this session will use.packages to function snowflake.snowpark.functions.udf() and method UserDefinedFunction.register() to indicate UDF-level Anaconda package dependencies when creating a UDF.imports to snowflake.snowpark.functions.udf() and UserDefinedFunction.register() to specify UDF-level code imports.session to function udf() and UserDefinedFunction.register() so you can specify which session to use to create a UDF if you have multiple sessions.Geography and Variant to snowflake.snowpark.types to be used as type hints for Geography and Variant data when defining a UDF.Table, a subclass of DataFrame for table operations:
update and delete update and delete rows of a table in Snowflake.merge merges data from a DataFrame to a Table.DataFrame.sample() with an additional parameter seed, which works on tables but not on view and sub-queries.DataFrame.to_local_iterator() and DataFrame.to_pandas_batches() to allow getting results from an iterator when the result set returned from the Snowflake database is too large.DataFrame.cache_result() for caching the operations performed on a DataFrame in a temporary table.
Subsequent operations on the original DataFrame have no effect on the cached result DataFrame.DataFrame.queries to get SQL queries that will be executed to evaluate the DataFrame.Session.query_history() as a context manager to track SQL queries executed on a session, including all SQL queries to evaluate DataFrames created from a session. Both query ID and query text are recorded.Session instance from an existing established snowflake.connector.SnowflakeConnection. Use parameter connection in Session.builder.configs().use_database(), use_schema(), use_warehouse(), and use_role() to class Session to switch database/schema/warehouse/role after a session is created.DataFrameWriter.copy_into_table() to unload a DataFrame to stage files.DataFrame.unpivot().Column.within_group() for sorting the rows by columns with some aggregation functions.listagg(), mode(), div0(), acos(), asin(), atan(), atan2(), cos(), cosh(), sin(), sinh(), tan(), tanh(), degrees(), radians(), round(), trunc(), and factorial() to snowflake.snowflake.functions.ignore_nulls in function lead() and lag().condition parameter of function when() and iff() now accepts SQL expressions.Session and replaced them with their snake case equivalents: getImports(), addImports(), removeImport(), clearImports(), getSessionStage(), getDefaultSchema(), getDefaultSchema(), getCurrentDatabase(), getFullyQualifiedCurrentSchema().DataFrame and replaced them with their snake case equivalents: groupingByGroupingSets(), naturalJoin(), withColumns(), joinTableFunction().DataFrame.columns is now consistent with DataFrame.schema.names and the Snowflake database Identifier Requirements.Column.__bool__() now raises a TypeError. This will ban the use of logical operators and, or, not on Column object, for instance col("a") > 1 and col("b") > 2 will raise the TypeError. Use (col("a") > 1) & (col("b") > 2) instead.PutResult and GetResult to subclass NamedTuple.DataFrame.describe() so that non-numeric and non-string columns are ignored instead of raising an exception.snowflake-connector-python to 2.7.4.Column.isin(), with an alias Column.in_().Column.try_cast(), which is a special version of cast(). It tries to cast a string expression to other types and returns null if the cast is not possible.Column.startswith() and Column.substr() to process string columns.Column.cast() now also accepts a str value to indicate the cast type in addition to a DataType instance.DataFrame.describe() to summarize stats of a DataFrame.DataFrame.explain() to print the query plan of a DataFrame.DataFrame.filter() and DataFrame.select_expr() now accepts a sql expression.bool parameter create_temp_table to methods DataFrame.saveAsTable() and Session.write_pandas() to optionally create a temp table.DataFrame.minus() and DataFrame.subtract() as aliases to DataFrame.except_().regexp_replace(), concat(), concat_ws(), to_char(), current_timestamp(), current_date(), current_time(), months_between(), cast(), try_cast(), greatest(), least(), and hash() to module snowflake.snowpark.functions.Session.createDataFrame(pandas_df) and Session.write_pandas(pandas_df) raise an exception when the Pandas DataFrame has spaces in the column name.DataFrame.copy_into_table() sometimes prints an error level log entry while it actually works. It's fixed now.DataFrame APIs are missing from the docs.snowflake-connector-python to 2.7.2, which upgrades pyarrow dependency to 6.0.x. Refer to the python connector 2.7.2 release notes for more details.Session.createDataFrame() method for creating a DataFrame from a Pandas DataFrame.Session.write_pandas() method for writing a Pandas DataFrame to a table in Snowflake and getting a Snowpark DataFrame object back.cume_dist(), to find the cumulative distribution of a value with regard to other values within a window partition,
and row_number(), which returns a unique row number for each row within a window partition.DataFrameStatFunctions class.DataFrameNaFunctions class.rollup(), cube(), and pivot() to the DataFrame class.GroupingSets class, which you can use with the DataFrame groupByGroupingSets method to perform a SQL GROUP BY GROUPING SETS.FileOperation(session)
class that you can use to upload and download files to and from a stage.DataFrame.copy_into_table()
method for loading data from files in a stage into a table.when() and otherwise()
now accept Python types in addition to Column objects.replace parameter to True to overwrite an existing UDF with the same name.df.select(when(col("a") == 1, 4).otherwise(col("a"))), [Row(4), Row(2), Row(3)] raised an exception.df.toPandas() raised an exception when a DataFrame was created from large local data.Start of Private Preview