PySpark Pandas_Udf()

Pyspark Pandas Udf



Преобразование PySpark DataFrame возможно с помощью функции pandas_udf(). Это определяемая пользователем функция, которая применяется к PySpark DataFrame со стрелкой. Мы можем выполнять векторизованные операции, используя функцию pandas_udf(). Это можно реализовать, передав эту функцию в качестве декоратора. Давайте углубимся в это руководство, чтобы узнать синтаксис, параметры и различные примеры.

Тема содержания:

Если вы хотите узнать об установке PySpark DataFrame и модуля, пройдите через это статья .







Pyspark.sql.functions.pandas_udf()

Pandas_udf() доступен в модуле sql.functions в PySpark, который можно импортировать с помощью ключевого слова from. Он используется для выполнения векторных операций в нашем PySpark DataFrame. Эта функция реализована как декоратор, передавая три параметра. После этого мы можем создать пользовательскую функцию, которая возвращает данные в векторном формате (например, мы используем для этого серию/NumPy) с помощью стрелки. В рамках этой функции мы можем вернуть результат.



Структура и синтаксис:



Во-первых, давайте посмотрим на структуру и синтаксис этой функции:

@pandas_udf (тип данных)
def имя_функции (операция) -> convert_format:
оператор возврата

Здесь имя_функции — это имя нашей определенной функции. Тип данных указывает тип данных, возвращаемых этой функцией. Мы можем вернуть результат, используя ключевое слово «return». Все операции выполняются внутри функции с назначением стрелки.





Pandas_udf (функция и возвращаемый тип)

  1. Первый параметр — это определяемая пользователем функция, которая передается ему.
  2. Второй параметр используется для указания типа данных, возвращаемых функцией.

Данные:

Во всем этом руководстве мы используем только один PySpark DataFrame для демонстрации. Все пользовательские функции, которые мы определяем, применяются к этому PySpark DataFrame. Убедитесь, что вы сначала создали этот DataFrame в своей среде после установки PySpark.



импортировать pyspark

из pyspark.sql импортировать SparkSession

linuxhint_spark_app = SparkSession.builder.appName( «Подсказка по Линукс» ).getOrCreate()

из pyspark.sql.functions импортировать pandas_udf

из импорта pyspark.sql.types *

импортировать панд как панду

# детали овощей

овощ =[{ 'тип' : 'овощ' , 'имя' : 'помидор' , 'найти_страну' : 'США' , 'количество' : 800 },

{ 'тип' : 'фрукты' , 'имя' : 'банан' , 'найти_страну' : 'КИТАЙ' , 'количество' : двадцать },

{ 'тип' : 'овощ' , 'имя' : 'помидор' , 'найти_страну' : 'США' , 'количество' : 800 },

{ 'тип' : 'овощ' , 'имя' : 'Манго' , 'найти_страну' : 'ЯПОНИЯ' , 'количество' : 0 },

{ 'тип' : 'фрукты' , 'имя' : 'лимон' , 'найти_страну' : 'ИНДИЯ' , 'количество' : 1700 },

{ 'тип' : 'овощ' , 'имя' : 'помидор' , 'найти_страну' : 'США' , 'количество' : 1200 },

{ 'тип' : 'овощ' , 'имя' : 'Манго' , 'найти_страну' : 'ЯПОНИЯ' , 'количество' : 0 },

{ 'тип' : 'фрукты' , 'имя' : 'лимон' , 'найти_страну' : 'ИНДИЯ' , 'количество' : 0 }

]

# создаем рыночный фрейм данных из приведенных выше данных

market_df = linuxhint_spark_app.createDataFrame(овощ)

market_df.show()

Выход:

Здесь мы создаем этот DataFrame с 4 столбцами и 8 строками. Теперь мы используем pandas_udf() для создания пользовательских функций и применения их к этим столбцам.

Pandas_udf() с разными типами данных

В этом сценарии мы создаем некоторые пользовательские функции с помощью pandas_udf(), применяем их к столбцам и отображаем результаты с помощью метода select(). В каждом случае мы используем pandas.Series при выполнении векторизованных операций. Это рассматривает значения столбца как одномерный массив, и операция применяется к столбцу. В самом декораторе мы указываем тип возврата функции.

Пример 1: Pandas_udf() со строковым типом

Здесь мы создаем две определяемые пользователем функции со строковым типом возвращаемого значения для преобразования значений столбца строкового типа в верхний и нижний регистр. Наконец, мы применяем эти функции к столбцам «type» и «locate_country».

# Преобразование столбца type в верхний регистр с помощью pandas_udf

@pandas_udf (StringType ())

def type_upper_case(i: panda.Series) -> panda.Series:

вернуть i.str.upper()

# Преобразование столбца locate_country в нижний регистр с помощью pandas_udf

@pandas_udf (StringType ())

def country_lower_case(i: panda.Series) -> panda.Series:

вернуть i.str.lower()

# Отобразить столбцы с помощью select()

market_df.select( 'тип' , type_upper_case( 'тип' ), 'найти_страна' ,
страна_нижний_регистр( 'найти_страна' )).показывать()

Выход:

Объяснение:

Функция StringType() доступна в модуле pyspark.sql.types. Мы уже импортировали этот модуль при создании PySpark DataFrame.

  1. Во-первых, UDF (определяемая пользователем функция) возвращает строки в верхнем регистре с помощью функции str.upper(). Функция str.upper() доступна в структуре данных серии (поскольку мы преобразуем ее в серию со стрелкой внутри функции), которая преобразует данную строку в верхний регистр. Наконец, эта функция применяется к столбцу «тип», указанному внутри метода select(). Раньше все строки в столбце типа были в нижнем регистре. Теперь они изменены на верхний регистр.
  2. Во-вторых, UDF возвращает строки в верхнем регистре, используя функцию str.lower(). Функция str.lower() доступна в структуре данных серии, которая преобразует данную строку в нижний регистр. Наконец, эта функция применяется к столбцу «тип», указанному внутри метода select(). Раньше все строки в столбце типа были в верхнем регистре. Теперь они заменены на строчные.

Пример 2: Pandas_udf() с целочисленным типом

Давайте создадим пользовательскую функцию, которая преобразует целочисленный столбец PySpark DataFrame в ряд Pandas и прибавит 100 к каждому значению. Передайте столбец «количество» этой функции внутри метода select().

# Добавить 100

@pandas_udf (целочисленный тип ())

def add_100(i: panda.Series) -> panda.Series:

вернуть я+ 100

# Передать столбец количества в вышеуказанную функцию и отобразить.

market_df.select( 'количество' ,добавить_100( 'количество' )).показывать()

Выход:

Объяснение:

Внутри UDF мы перебираем все значения и преобразовываем их в ряды. После этого мы добавляем 100 к каждому значению в серии. Наконец, мы передаем этой функции столбец «количество» и видим, что ко всем значениям добавляется 100.

Pandas_udf() с разными типами данных с использованием Groupby() и Agg()

Давайте рассмотрим примеры передачи UDF в агрегированные столбцы. Здесь значения столбцов сначала группируются с помощью функции groupby(), а агрегирование выполняется с помощью функции agg(). Мы передаем нашу UDF внутри этой агрегатной функции.

Синтаксис:

pyspark_dataframe_object.groupby( 'группировка_столбца' ).agg(УДФ
(pyspark_dataframe_object[ 'столбец' ]))

Здесь сначала группируются значения в столбце группировки. Затем агрегирование выполняется для каждой группы данных по отношению к нашей UDF.

Пример 1: Pandas_udf() с совокупным средним значением()

Здесь мы создаем определяемую пользователем функцию с типом возвращаемого значения float. Внутри функции мы вычисляем среднее значение с помощью функции mean(). Эта UDF передается в столбец «количество», чтобы получить среднее количество для каждого типа.

# вернуть среднее/среднее

@пандас_удф( 'плавать' )

def medium_function(i: panda.Series) -> float:

вернуть i.mean()

# Передать столбец количества в функцию, сгруппировав столбец типа.

market_df.groupby( 'тип' ).agg(средняя_функция(market_df[ 'количество' ])).показывать()

Выход:

Мы группируем на основе элементов в столбце «тип». Образуются две группы – «фруктовые» и «овощные». Для каждой группы вычисляется и возвращается среднее значение.

Пример 2: Pandas_udf() с Aggregate Max() и Min()

Здесь мы создаем две определяемые пользователем функции с типом возвращаемого значения integer (int). Первая пользовательская функция возвращает минимальное значение, а вторая пользовательская функция возвращает максимальное значение.

# pandas_udf, которые возвращают минимальное значение

@пандас_удф( 'инт' )

def min_(i: panda.Series) -> int:

вернуть i.min()

# pandas_udf, которые возвращают максимальное значение

@пандас_удф( 'инт' )

def max_(i: panda.Series) -> int:

вернуть i.max()

# Передать столбец количества в min_pandas_udf, сгруппировав locate_country.

market_df.groupby( 'найти_страна' ).agg(min_(market_df[ 'количество' ])).показывать()

# Передать столбец количества в max_ pandas_udf, сгруппировав locate_country.

market_df.groupby( 'найти_страна' ).agg(max_(market_df[ 'количество' ])).показывать()

Выход:

Чтобы вернуть минимальное и максимальное значения, мы используем функции min() и max() в возвращаемом типе UDF. Теперь мы группируем данные в столбце «locate_country». Образуются четыре группы («КИТАЙ», «ИНДИЯ», «ЯПОНИЯ», «США»). Для каждой группы мы возвращаем максимальное количество. Аналогично возвращаем минимальное количество.

Заключение

По сути, pandas_udf() используется для выполнения векторизованных операций в нашем PySpark DataFrame. Мы увидели, как создать pandas_udf() и применить его к PySpark DataFrame. Для лучшего понимания мы обсудили различные примеры, рассмотрев все типы данных (строка, число с плавающей запятой и целое число). Можно использовать pandas_udf() с groupby() через функцию agg().