Тема содержания:
- Pandas_udf() с разными типами данных
- Pandas_udf() с разными типами данных с использованием Groupby() и Agg()
Если вы хотите узнать об установке PySpark DataFrame и модуля, пройдите через это статья .
Pyspark.sql.functions.pandas_udf()
Pandas_udf() доступен в модуле sql.functions в PySpark, который можно импортировать с помощью ключевого слова from. Он используется для выполнения векторных операций в нашем PySpark DataFrame. Эта функция реализована как декоратор, передавая три параметра. После этого мы можем создать пользовательскую функцию, которая возвращает данные в векторном формате (например, мы используем для этого серию/NumPy) с помощью стрелки. В рамках этой функции мы можем вернуть результат.
Структура и синтаксис:
Во-первых, давайте посмотрим на структуру и синтаксис этой функции:
@pandas_udf (тип данных)def имя_функции (операция) -> convert_format:
оператор возврата
Здесь имя_функции — это имя нашей определенной функции. Тип данных указывает тип данных, возвращаемых этой функцией. Мы можем вернуть результат, используя ключевое слово «return». Все операции выполняются внутри функции с назначением стрелки.
Pandas_udf (функция и возвращаемый тип)
- Первый параметр — это определяемая пользователем функция, которая передается ему.
- Второй параметр используется для указания типа данных, возвращаемых функцией.
Данные:
Во всем этом руководстве мы используем только один PySpark DataFrame для демонстрации. Все пользовательские функции, которые мы определяем, применяются к этому PySpark DataFrame. Убедитесь, что вы сначала создали этот DataFrame в своей среде после установки PySpark.
импортировать pyspark
из pyspark.sql импортировать SparkSession
linuxhint_spark_app = SparkSession.builder.appName( «Подсказка по Линукс» ).getOrCreate()
из pyspark.sql.functions импортировать pandas_udf
из импорта pyspark.sql.types *
импортировать панд как панду
# детали овощей
овощ =[{ 'тип' : 'овощ' , 'имя' : 'помидор' , 'найти_страну' : 'США' , 'количество' : 800 },
{ 'тип' : 'фрукты' , 'имя' : 'банан' , 'найти_страну' : 'КИТАЙ' , 'количество' : двадцать },
{ 'тип' : 'овощ' , 'имя' : 'помидор' , 'найти_страну' : 'США' , 'количество' : 800 },
{ 'тип' : 'овощ' , 'имя' : 'Манго' , 'найти_страну' : 'ЯПОНИЯ' , 'количество' : 0 },
{ 'тип' : 'фрукты' , 'имя' : 'лимон' , 'найти_страну' : 'ИНДИЯ' , 'количество' : 1700 },
{ 'тип' : 'овощ' , 'имя' : 'помидор' , 'найти_страну' : 'США' , 'количество' : 1200 },
{ 'тип' : 'овощ' , 'имя' : 'Манго' , 'найти_страну' : 'ЯПОНИЯ' , 'количество' : 0 },
{ 'тип' : 'фрукты' , 'имя' : 'лимон' , 'найти_страну' : 'ИНДИЯ' , 'количество' : 0 }
]
# создаем рыночный фрейм данных из приведенных выше данных
market_df = linuxhint_spark_app.createDataFrame(овощ)
market_df.show()
Выход:
Здесь мы создаем этот DataFrame с 4 столбцами и 8 строками. Теперь мы используем pandas_udf() для создания пользовательских функций и применения их к этим столбцам.
Pandas_udf() с разными типами данных
В этом сценарии мы создаем некоторые пользовательские функции с помощью pandas_udf(), применяем их к столбцам и отображаем результаты с помощью метода select(). В каждом случае мы используем pandas.Series при выполнении векторизованных операций. Это рассматривает значения столбца как одномерный массив, и операция применяется к столбцу. В самом декораторе мы указываем тип возврата функции.
Пример 1: Pandas_udf() со строковым типом
Здесь мы создаем две определяемые пользователем функции со строковым типом возвращаемого значения для преобразования значений столбца строкового типа в верхний и нижний регистр. Наконец, мы применяем эти функции к столбцам «type» и «locate_country».
# Преобразование столбца type в верхний регистр с помощью pandas_udf@pandas_udf (StringType ())
def type_upper_case(i: panda.Series) -> panda.Series:
вернуть i.str.upper()
# Преобразование столбца locate_country в нижний регистр с помощью pandas_udf
@pandas_udf (StringType ())
def country_lower_case(i: panda.Series) -> panda.Series:
вернуть i.str.lower()
# Отобразить столбцы с помощью select()
market_df.select( 'тип' , type_upper_case( 'тип' ), 'найти_страна' ,
страна_нижний_регистр( 'найти_страна' )).показывать()
Выход:
Объяснение:
Функция StringType() доступна в модуле pyspark.sql.types. Мы уже импортировали этот модуль при создании PySpark DataFrame.
- Во-первых, UDF (определяемая пользователем функция) возвращает строки в верхнем регистре с помощью функции str.upper(). Функция str.upper() доступна в структуре данных серии (поскольку мы преобразуем ее в серию со стрелкой внутри функции), которая преобразует данную строку в верхний регистр. Наконец, эта функция применяется к столбцу «тип», указанному внутри метода select(). Раньше все строки в столбце типа были в нижнем регистре. Теперь они изменены на верхний регистр.
- Во-вторых, UDF возвращает строки в верхнем регистре, используя функцию str.lower(). Функция str.lower() доступна в структуре данных серии, которая преобразует данную строку в нижний регистр. Наконец, эта функция применяется к столбцу «тип», указанному внутри метода select(). Раньше все строки в столбце типа были в верхнем регистре. Теперь они заменены на строчные.
Пример 2: Pandas_udf() с целочисленным типом
Давайте создадим пользовательскую функцию, которая преобразует целочисленный столбец PySpark DataFrame в ряд Pandas и прибавит 100 к каждому значению. Передайте столбец «количество» этой функции внутри метода select().
# Добавить 100@pandas_udf (целочисленный тип ())
def add_100(i: panda.Series) -> panda.Series:
вернуть я+ 100
# Передать столбец количества в вышеуказанную функцию и отобразить.
market_df.select( 'количество' ,добавить_100( 'количество' )).показывать()
Выход:
Объяснение:
Внутри UDF мы перебираем все значения и преобразовываем их в ряды. После этого мы добавляем 100 к каждому значению в серии. Наконец, мы передаем этой функции столбец «количество» и видим, что ко всем значениям добавляется 100.
Pandas_udf() с разными типами данных с использованием Groupby() и Agg()
Давайте рассмотрим примеры передачи UDF в агрегированные столбцы. Здесь значения столбцов сначала группируются с помощью функции groupby(), а агрегирование выполняется с помощью функции agg(). Мы передаем нашу UDF внутри этой агрегатной функции.
Синтаксис:
pyspark_dataframe_object.groupby( 'группировка_столбца' ).agg(УДФ(pyspark_dataframe_object[ 'столбец' ]))
Здесь сначала группируются значения в столбце группировки. Затем агрегирование выполняется для каждой группы данных по отношению к нашей UDF.
Пример 1: Pandas_udf() с совокупным средним значением()
Здесь мы создаем определяемую пользователем функцию с типом возвращаемого значения float. Внутри функции мы вычисляем среднее значение с помощью функции mean(). Эта UDF передается в столбец «количество», чтобы получить среднее количество для каждого типа.
# вернуть среднее/среднее@пандас_удф( 'плавать' )
def medium_function(i: panda.Series) -> float:
вернуть i.mean()
# Передать столбец количества в функцию, сгруппировав столбец типа.
market_df.groupby( 'тип' ).agg(средняя_функция(market_df[ 'количество' ])).показывать()
Выход:
Мы группируем на основе элементов в столбце «тип». Образуются две группы – «фруктовые» и «овощные». Для каждой группы вычисляется и возвращается среднее значение.
Пример 2: Pandas_udf() с Aggregate Max() и Min()
Здесь мы создаем две определяемые пользователем функции с типом возвращаемого значения integer (int). Первая пользовательская функция возвращает минимальное значение, а вторая пользовательская функция возвращает максимальное значение.
# pandas_udf, которые возвращают минимальное значение@пандас_удф( 'инт' )
def min_(i: panda.Series) -> int:
вернуть i.min()
# pandas_udf, которые возвращают максимальное значение
@пандас_удф( 'инт' )
def max_(i: panda.Series) -> int:
вернуть i.max()
# Передать столбец количества в min_pandas_udf, сгруппировав locate_country.
market_df.groupby( 'найти_страна' ).agg(min_(market_df[ 'количество' ])).показывать()
# Передать столбец количества в max_ pandas_udf, сгруппировав locate_country.
market_df.groupby( 'найти_страна' ).agg(max_(market_df[ 'количество' ])).показывать()
Выход:
Чтобы вернуть минимальное и максимальное значения, мы используем функции min() и max() в возвращаемом типе UDF. Теперь мы группируем данные в столбце «locate_country». Образуются четыре группы («КИТАЙ», «ИНДИЯ», «ЯПОНИЯ», «США»). Для каждой группы мы возвращаем максимальное количество. Аналогично возвращаем минимальное количество.
Заключение
По сути, pandas_udf() используется для выполнения векторизованных операций в нашем PySpark DataFrame. Мы увидели, как создать pandas_udf() и применить его к PySpark DataFrame. Для лучшего понимания мы обсудили различные примеры, рассмотрев все типы данных (строка, число с плавающей запятой и целое число). Можно использовать pandas_udf() с groupby() через функцию agg().