Pyspark.sql.DataFrameWriter.saveAsTable()
Сначала мы увидим, как записать существующий кадр данных PySpark в таблицу с помощью функции write.saveAsTable(). Для записи DataFrame в таблицу требуется имя таблицы и другие необязательные параметры, такие как режимы, partionBy и т. д. Хранится в виде паркетного файла.
Синтаксис:
dataframe_obj.write.saveAsTable(путь/имя_таблицы,режим,partitionBy,…)
- Table_name — это имя таблицы, созданной из dataframe_obj.
- Мы можем добавить/перезаписать данные таблицы, используя параметр режима.
- PartitionBy использует один или несколько столбцов для создания разделов на основе значений в этих предоставленных столбцах.
Пример 1:
Создайте PySpark DataFrame с 5 строками и 4 столбцами. Запишите этот кадр данных в таблицу с именем «Agri_Table1».
импортировать pyspark
из pyspark.sql импортировать SparkSession
linuxhint_spark_app = SparkSession.builder.appName( «Подсказка по Линукс» ).getOrCreate()
# фермерские данные с 5 строками и 5 столбцами
сельское хозяйство =[{ 'Тип_почвы' : «Черный» , «Ирригация_доступность» : 'Нет' , 'Акры' : 2500 , 'Soil_status' : 'Сухой' ,
'Страна' : 'США' },
{ 'Тип_почвы' : «Черный» , «Ирригация_доступность» : 'Да' , 'Акры' : 3500 , 'Soil_status' : 'Влажный' ,
'Страна' : 'Индия' },
{ 'Тип_почвы' : 'Красный' , «Ирригация_доступность» : 'Да' , 'Акры' : 210 , 'Soil_status' : 'Сухой' ,
'Страна' : 'ВЕЛИКОБРИТАНИЯ' },
{ 'Тип_почвы' : 'Другой' , «Ирригация_доступность» : 'Нет' , 'Акры' : 1000 , 'Soil_status' : 'Влажный' ,
'Страна' : 'США' },
{ 'Тип_почвы' : 'Песок' , «Ирригация_доступность» : 'Нет' , 'Акры' : 500 , 'Soil_status' : 'Сухой' ,
'Страна' : 'Индия' }]
# создаем фрейм данных из приведенных выше данных
agri_df = linuxhint_spark_app.createDataFrame (агри)
agri_df.show()
# Записываем указанный выше DataFrame в таблицу.
agri_df.coalesce( 1 ).write.saveAsTable( 'Сельское хозяйство_Таблица1' )
Выход:
Мы видим, что один файл паркета создается с предыдущими данными PySpark.
Пример 2:
Рассмотрим предыдущий DataFrame и запишите «Agri_Table2» в таблицу, разделив записи на основе значений в столбце «Страна».
# Записываем указанный выше DataFrame в таблицу с параметром partitionByagri_df.write.saveAsTable( 'Агро_Таблица2' ,partitionBy=[ 'Страна' ])
Выход:
В столбце «Страна» есть три уникальных значения — «Индия», «Великобритания» и «США». Итак, три раздела созданы. Каждый раздел содержит паркетные файлы.
Pyspark.sql.DataFrameReader.table()
Давайте загрузим таблицу в PySpark DataFrame, используя функцию spark.read.table(). Он принимает только один параметр, который является путем/именем таблицы. Он напрямую загружает таблицу в PySpark DataFrame, и все функции SQL, которые применяются к PySpark DataFrame, также могут применяться к этому загруженному DataFrame.
Синтаксис:
spark_app.read.table (путь/имя_таблицы)В этом сценарии мы используем предыдущую таблицу, созданную из PySpark DataFrame. Убедитесь, что вам нужно реализовать фрагменты кода предыдущего сценария в вашей среде.
Пример:
Загрузите таблицу «Agri_Table1» в DataFrame с именем «loaded_data».
загруженные_данные = linuxhint_spark_app.read.table( 'Agri_Table1' )загруженные_данные.show()
Выход:
Мы видим, что таблица загружается в PySpark DataFrame.
Выполнение запросов SQL
Теперь мы выполняем некоторые SQL-запросы к загруженному DataFrame, используя функцию spark.sql().
# Используйте команду SELECT, чтобы отобразить все столбцы из приведенной выше таблицы.linuxhint_spark_app.sql( 'ВЫБЕРИТЕ * из Agri_Table1' ).показывать()
# ГДЕ Предложение
linuxhint_spark_app.sql( 'SELECT * from Agri_Table1 WHERE Soil_status='Dry'' ).показывать()
linuxhint_spark_app.sql( 'ВЫБЕРИТЕ * из Agri_Table1, ГДЕ Акры > 2000' ).показывать()
Выход:
- Первый запрос отображает все столбцы и записи из DataFrame.
- Второй запрос отображает записи на основе столбца «Soil_status». Всего три записи с элементом «Сухой».
- Последний запрос возвращает две записи с «Акрами», превышающими 2000.
Pyspark.sql.DataFrameWriter.insertInto()
Используя функцию insertInto(), мы можем добавить DataFrame в существующую таблицу. Мы можем использовать эту функцию вместе с selectExpr(), чтобы определить имена столбцов, а затем вставить их в таблицу. Эта функция также принимает имя таблицы в качестве параметра.
Синтаксис:
DataFrame_obj.write.insertInto('Table_name')В этом сценарии мы используем предыдущую таблицу, созданную из PySpark DataFrame. Убедитесь, что вам нужно реализовать фрагменты кода предыдущего сценария в вашей среде.
Пример:
Создайте новый DataFrame с двумя записями и вставьте их в таблицу «Agri_Table1».
импортировать pysparkиз pyspark.sql импортировать SparkSession
linuxhint_spark_app = SparkSession.builder.appName( «Подсказка по Линукс» ).getOrCreate()
# фермерские данные с 2 строками
сельское хозяйство =[{ 'Тип_почвы' : 'Песок' , «Ирригация_доступность» : 'Нет' , 'Акры' : 2500 , 'Soil_status' : 'Сухой' ,
'Страна' : 'США' },
{ 'Тип_почвы' : 'Песок' , «Ирригация_доступность» : 'Нет' , 'Акры' : 1200 , 'Soil_status' : 'Влажный' ,
'Страна' : 'Япония' }]
# создаем фрейм данных из приведенных выше данных
agri_df2 = linuxhint_spark_app.createDataFrame (агро)
agri_df2.show()
# write.insertInto()
agri_df2.selectExpr( 'Акры' , 'Страна' , 'Ирригация_доступность' , 'Тип_почвы' ,
'Состояние_почвы' ).write.insertInto( 'Сельское хозяйство_Таблица1' )
# Показать окончательный Agri_Table1
linuxhint_spark_app.sql( 'ВЫБЕРИТЕ * из Agri_Table1' ).показывать()
Выход:
Теперь общее количество строк, присутствующих в DataFrame, равно 7.
Заключение
Теперь вы понимаете, как записать кадр данных PySpark в таблицу с помощью функции write.saveAsTable(). Он принимает имя таблицы и другие необязательные параметры. Затем мы загрузили эту таблицу в PySpark DataFrame с помощью функции spark.read.table(). Он принимает только один параметр, который является путем/именем таблицы. Если вы хотите добавить новый DataFrame в существующую таблицу, используйте функцию insertInto().