PySpark Read.Parquet()

Pyspark Read Parquet



В PySpark функция write.parquet() записывает кадр данных в файл паркета, а функция read.parquet() считывает файл паркета в кадр данных PySpark или любой другой источник данных. Чтобы быстро и эффективно обрабатывать столбцы в Apache Spark, нам необходимо сжать данные. Сжатие данных экономит нашу память, и все столбцы преобразуются в плоский уровень. Это означает, что существует хранилище на уровне плоского столбца. Файл, в котором они хранятся, известен как файл PARQUET.

В этом руководстве мы в основном сосредоточимся на чтении/загрузке файла паркета в PySpark DataFrame/SQL с использованием функции read.parquet(), доступной в классе pyspark.sql.DataFrameReader.

Тема содержания:







Получить файл паркета



Считайте файл Parquet в PySpark DataFrame



Прочитайте файл Parquet в PySpark SQL





Pyspark.sql.DataFrameReader.parquet()

Эта функция используется для чтения файла паркета и его загрузки в PySpark DataFrame. Он принимает путь/имя файла паркета. Мы можем просто использовать функцию read.parquet(), так как это общая функция.

Синтаксис:



Давайте посмотрим на синтаксис read.parquet():

spark_app.read.parquet(имя_файла.parquet/путь)

Сначала установите модуль PySpark с помощью команды pip:

pip установить pyspark

Получить файл паркета

Чтобы прочитать файл паркета, вам нужны данные, в которых файл паркета генерируется из этих данных. В этой части мы увидим, как создать файл паркета из PySpark DataFrame.

Давайте создадим PySpark DataFrame с 5 записями и запишем это в файл паркета «industry_parquet».

импортировать pyspark

из pyspark.sql импортировать SparkSession, Row

linuxhint_spark_app = SparkSession.builder.appName( «Подсказка по Линукс» ).getOrCreate()

# создаем фрейм данных, в котором хранится информация об отрасли

Industry_df = linuxhint_spark_app.createDataFrame([Строка(Тип= 'Сельское хозяйство' , Площадь = 'США' ,
Рейтинг= 'Горячий' ,Всего_сотрудников= 100 ),

Строка (Тип = 'Сельское хозяйство' , Площадь = 'Индия' ,Рейтинг= 'Горячий' ,Всего_сотрудников= 200 ),

Строка (Тип = 'Разработка' , Площадь = 'США' ,Рейтинг= 'Теплый' ,Всего_сотрудников= 100 ),

Строка (Тип = 'Образование' , Площадь = 'США' ,Рейтинг= 'Прохладный' ,Всего_сотрудников= 400 ),

Строка (Тип = 'Образование' , Площадь = 'США' ,Рейтинг= 'Теплый' ,Всего_сотрудников= двадцать )

])

# Фактический кадр данных

Industry_df.show ()

# Записываем industry_df в файл паркета

Industry_df.coalesce( 1 ).написать.паркет( 'индустрия_паркет' )

Выход:

Это DataFrame, который содержит 5 записей.

Файл паркета создается для предыдущего кадра данных. Здесь имя нашего файла с расширением «part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet». Мы используем этот файл во всем руководстве.

Считайте файл Parquet в PySpark DataFrame

У нас есть паркетная доска. Давайте прочитаем этот файл с помощью функции read.parquet() и загрузим его в PySpark DataFrame.

импортировать pyspark

из pyspark.sql импортировать SparkSession, Row

linuxhint_spark_app = SparkSession.builder.appName( «Подсказка по Линукс» ).getOrCreate()

# Прочитать файл паркета в объект dataframe_from_parquet.

dataframe_from_parquet = linuxhint_spark_app.read.parquet ( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet' )

# Показать dataframe_from_parquet-DataFrame

dataframe_from_parquet.show()

Выход:

Мы отображаем DataFrame, используя метод show(), который был создан из файла паркета.

SQL-запросы с файлом Parquet

После загрузки в DataFrame можно создать таблицы SQL и отобразить данные, присутствующие в DataFrame. Нам нужно создать ВРЕМЕННОЕ ПРЕДСТАВЛЕНИЕ и использовать команды SQL для возврата записей из DataFrame, созданного из файла паркета.

Пример 1:

Создайте временное представление с именем «Секторы» и используйте команду SELECT для отображения записей в DataFrame. Вы можете обратиться к этому руководство это объясняет, как создать VIEW в Spark — SQL.

импортировать pyspark

из pyspark.sql импортировать SparkSession, Row

linuxhint_spark_app = SparkSession.builder.appName( «Подсказка по Линукс» ).getOrCreate()

# Прочитать файл паркета в объект dataframe_from_parquet.

dataframe_from_parquet = linuxhint_spark_app.read.parquet ( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet' )

# Создаем вид из указанного выше файла паркета с именем - 'Сектора'

dataframe_from_parquet.createOrReplaceTempView( «Сектора» )

# Запрос для отображения всех записей из секторов

linuxhint_spark_app.sql( 'выбрать * из секторов' ).показывать()

Выход:

Пример 2:

Используя предыдущий VIEW, напишите запрос SQL:

  1. Чтобы отобразить все записи из секторов, принадлежащих «Индии».
  2. Чтобы отобразить все записи из секторов с сотрудником, который больше 100.
# Запрос для отображения всех записей из секторов, принадлежащих 'Индии'.

linuxhint_spark_app.sql( 'выберите * из секторов, где Area='Индия'' ).показывать()

# Запрос для отображения всех записей из секторов с числом сотрудников более 100

linuxhint_spark_app.sql( 'выберите * из секторов, где Total_employees > 100' ).показывать()

Выход:

Есть только одна запись с областью «Индия» и две записи с количеством сотрудников больше 100.

Прочитайте файл Parquet в PySpark SQL

Во-первых, нам нужно создать ВИД с помощью команды CREATE. Используя ключевое слово «путь» в SQL-запросе, мы можем прочитать файл паркета в Spark SQL. После пути нам нужно указать имя файла/местоположение файла.

Синтаксис:

spark_app.sql( 'СОЗДАТЬ ВРЕМЕННЫЙ ВИД view_name ИСПОЛЬЗУЯ ПАРАМЕТРЫ паркета (путь ' имя_файла.parquet ')' )

Пример 1:

Создайте временное представление с именем «Sector2» и прочитайте в него файл паркета. Используя функцию sql(), напишите запрос выбора для отображения всех записей, присутствующих в представлении.

импортировать pyspark

из pyspark.sql импортировать SparkSession, Row

linuxhint_spark_app = SparkSession.builder.appName( «Подсказка по Линукс» ).getOrCreate()

# Прочитать файл паркета в Spark-SQL

linuxhint_spark_app.sql( 'СОЗДАТЬ ВРЕМЕННЫЙ ВИД Сектор2, ИСПОЛЬЗУЯ ПАРАМЕТРЫ паркета (путь ' часть-00000-ff70f69d-f1fb- 4450 -b4b4-dfd5a8d6c7ad-c000.моментальный.паркет ')' )

# Запрос для отображения всех записей из Sector2

linuxhint_spark_app.sql( 'выбрать * из Сектора2' ).показывать()

Выход:

Пример 2:

Используйте предыдущий VIEW и напишите запрос для отображения всех записей с рейтингом «Hot» или «Cool».

# Запрос для отображения всех записей из Sector2 с рейтингом Hot или Cool.

linuxhint_spark_app.sql( 'выберите * из Sector2, где Rating='Hot' OR Rating='Cool'' ).показывать()

Выход:

Есть три записи с рейтингом «Горячий» или «Крутой».

Заключение

В PySpark функция write.parquet() записывает кадр данных в файл паркета. Функция read.parquet() считывает файл паркета в PySpark DataFrame или любой другой источник данных. Мы научились читать файл паркета в PySpark DataFrame и в таблицу PySpark. В рамках этого руководства мы также обсудили, как создавать таблицы из фрейма данных PySpark и фильтровать данные с помощью предложения WHERE.