Как реализовать потоковую передачу данных в реальном времени в Python

Kak Realizovat Potokovuu Peredacu Dannyh V Real Nom Vremeni V Python



Освоение реализации потоковой передачи данных в реальном времени на Python является важным навыком в современном мире, связанном с данными. В этом руководстве рассматриваются основные шаги и основные инструменты для использования потоковой передачи данных в реальном времени с аутентичностью в Python. От выбора подходящей среды, такой как Apache Kafka или Apache Pulsar, до написания кода Python для простого потребления, обработки и эффективной визуализации данных — мы приобретем необходимые навыки для создания гибких и эффективных каналов данных в реальном времени.

Пример 1. Реализация потоковой передачи данных в реальном времени на Python

Реализация потоковой передачи данных в реальном времени в Python имеет решающее значение в современном мире, где данные управляются. В этом подробном примере мы рассмотрим процесс создания системы потоковой передачи данных в реальном времени с использованием Apache Kafka и Python в Google Colab.







Чтобы инициализировать пример перед тем, как мы начнем кодирование, необходимо создать определенную среду в Google Colab. Первое, что нам нужно сделать, это установить необходимые библиотеки. Мы используем библиотеку «kafka-python» для интеграции Kafka.



! пункт установить Кафка-Питон


Эта команда устанавливает библиотеку «kafka-python», которая предоставляет функции Python и привязки для Apache Kafka. Далее импортируем необходимые библиотеки для нашего проекта. Импорт необходимых библиотек, включая «KafkaProducer» и «KafkaConsumer» — это классы из библиотеки «kafka-python», которые позволяют нам взаимодействовать с брокерами Kafka. JSON — это библиотека Python для работы с данными JSON, которые мы используем для сериализации и десериализации сообщений.



из импорта Kafka KafkaProducer, KafkaConsumer
импортировать JSON


Создание продюсера Kafka





Это важно, поскольку производитель Kafka отправляет данные в тему Kafka. В нашем примере мы создаем производителя для отправки смоделированных данных в реальном времени в тему, называемую «тема в реальном времени».

Мы создаем экземпляр KafkaProducer, который указывает адрес брокера Kafka как «localhost:9092». Затем мы используем «value_serializer» — функцию, которая сериализует данные перед отправкой их в Kafka. В нашем случае лямбда-функция кодирует данные как JSON в кодировке UTF-8. Теперь давайте смоделируем некоторые данные в реальном времени и отправим их в тему Kafka.



производитель = KafkaProducer ( bootstrap_servers '=' 'локальный хост: 9092' ,
value_serializer =лямбда v: json.dumps ( в ) .кодировать ( 'utf-8' ) )
# Моделируемые данные в реальном времени
данные = { 'sensor_id' : 1 , 'температура' : 25,5 , 'влажность' : 60,2 }
# Отправляем данные в тему
продюсер.отправить ( 'тема в реальном времени' , данные )


В этих строках мы определяем словарь «данных», который представляет смоделированные данные датчика. Затем мы используем метод «отправить», чтобы опубликовать эти данные в «теме реального времени».

Затем мы хотим создать потребителя Kafka, и потребитель Kafka считывает данные из темы Kafka. Мы создаем потребителя для потребления и обработки сообщений в «теме реального времени». Мы создаем экземпляр KafkaConsumer, указывая тему, которую мы хотим использовать, например (тема в реальном времени) и адрес брокера Kafka. Затем «value_deserializer» — это функция, которая десериализует данные, полученные от Kafka. В нашем случае лямбда-функция декодирует данные как JSON в кодировке UTF-8.

потребитель = КафкаКонсумер ( 'тема в реальном времени' ,
bootstrap_servers '=' 'локальный хост: 9092' ,
value_deserializer =лямбда х: json.loads ( x.decode ( 'utf-8' ) ) )


Мы используем итеративный цикл для непрерывного получения и обработки сообщений из темы.

# Чтение и обработка данных в реальном времени
для сообщение в потребитель:
данные = сообщение.значение
Распечатать ( ж «Полученные данные: {данные}» )


Мы извлекаем значение каждого сообщения и данные смоделированных датчиков внутри цикла и выводим их на консоль. Запуск производителя и потребителя Kafka включает запуск этого кода в Google Colab и индивидуальное выполнение ячеек кода. Производитель отправляет смоделированные данные в тему Kafka, а потребитель считывает и распечатывает полученные данные.


Анализ выходных данных во время выполнения кода

Мы будем наблюдать за данными в реальном времени, которые производятся и потребляются. Формат данных может варьироваться в зависимости от нашего моделирования или фактического источника данных. В этом подробном примере мы рассмотрим весь процесс настройки системы потоковой передачи данных в реальном времени с использованием Apache Kafka и Python в Google Colab. Мы объясним каждую строку кода и ее значение при построении этой системы. Потоковая передача данных в реальном времени — это мощная возможность, и этот пример служит основой для более сложных реальных приложений.

Пример 2. Реализация потоковой передачи данных в реальном времени на Python с использованием данных фондового рынка

Давайте приведем еще один уникальный пример реализации потоковой передачи данных в реальном времени в Python, используя другой сценарий; на этот раз мы сосредоточимся на данных фондового рынка. Мы создаем систему потоковой передачи данных в режиме реального времени, которая фиксирует изменения цен на акции и обрабатывает их с помощью Apache Kafka и Python в Google Colab. Как показано в предыдущем примере, мы начинаем с настройки нашей среды в Google Colab. Сначала устанавливаем необходимые библиотеки:

! пункт установить Кафка-Питон yfinance


Здесь мы добавляем библиотеку «yfinance», которая позволяет нам получать данные фондового рынка в режиме реального времени. Далее импортируем необходимые библиотеки. Мы продолжаем использовать классы «KafkaProducer» и «KafkaConsumer» из библиотеки «kafka-python» для взаимодействия с Kafka. Мы импортируем JSON для работы с данными JSON. Мы также используем «yfinance» для получения данных фондового рынка в режиме реального времени. Мы также импортируем библиотеку «time», чтобы добавить задержку для имитации обновлений в реальном времени.

из импорта Kafka KafkaProducer, KafkaConsumer
импортировать JSON
импортировать финансы как да
Импортировать время


Теперь мы создаем производитель Kafka для данных о запасах. Наш производитель Kafka получает данные о запасах в режиме реального времени и отправляет их в тему Kafka под названием «цена на акции».

производитель = KafkaProducer ( bootstrap_servers '=' 'локальный хост: 9092' ,
value_serializer =лямбда v: json.dumps ( в ) .кодировать ( 'utf-8' ) )

пока Истинный:
акции = yf.Тикер ( 'ААПЛ' ) # Пример: акции Apple Inc.
stock_data = stock.история ( период '=' '1д' )
последняя_цена = данные_запаса [ 'Закрывать' ] .iloc [ - 1 ]
данные = { 'символ' : 'ААПЛ' , 'цена' : Последняя цена }
продюсер.отправить ( 'цена акции' , данные )
время.сон ( 10 ) # Имитировать обновления в реальном времени каждые 10 секунд


В этом коде мы создаем экземпляр KafkaProducer с адресом брокера Kafka. Внутри цикла мы используем «yfinance», чтобы получить последнюю цену акций Apple Inc. («AAPL»). Затем мы извлекаем последнюю цену закрытия и отправляем ее в тему «цена акций». В конце концов мы вводим задержку для имитации обновлений в реальном времени каждые 10 секунд.

Давайте создадим потребителя Kafka для чтения и обработки данных о ценах на акции из темы «цена на акции».

потребитель = КафкаКонсумер ( 'цена акции' ,
bootstrap_servers '=' 'локальный хост: 9092' ,
value_deserializer =лямбда х: json.loads ( x.decode ( 'utf-8' ) ) )

для сообщение в потребитель:
stock_data = сообщение.значение
Распечатать ( ж «Полученные данные об акциях: {stock_data['symbol']} — цена: {stock_data['price']}» )


Этот код аналогичен настройке потребителя из предыдущего примера. Он постоянно считывает и обрабатывает сообщения из темы «цена акций» и выводит на консоль символ акции и цену. Мы выполняем ячейки кода последовательно, например, одну за другой в Google Colab для запуска производителя и потребителя. Производитель получает и отправляет обновления цен на акции в режиме реального времени, в то время как потребитель считывает и отображает эти данные.

! пункт установить Кафка-Питон yfinance
из импорта Kafka KafkaProducer, KafkaConsumer
импортировать JSON
импортировать финансы как да
Импортировать время
производитель = KafkaProducer ( bootstrap_servers '=' 'локальный хост: 9092' ,
value_serializer =лямбда v: json.dumps ( в ) .кодировать ( 'utf-8' ) )

пока Истинный:
акции = yf.Тикер ( 'ААПЛ' ) # акции Apple Inc.
stock_data = stock.история ( период '=' '1д' )
последняя_цена = данные_запаса [ 'Закрывать' ] .iloc [ - 1 ]

данные = { 'символ' : 'ААПЛ' , 'цена' : Последняя цена }

продюсер.отправить ( 'цена акции' , данные )

время.сон ( 10 ) # Имитировать обновления в реальном времени каждые 10 секунд
потребитель = КафкаКонсумер ( 'цена акции' ,
bootstrap_servers '=' 'локальный хост: 9092' ,
value_deserializer =лямбда х: json.loads ( x.decode ( 'utf-8' ) ) )

для сообщение в потребитель:
stock_data = сообщение.значение
Распечатать ( ж «Полученные данные об акциях: {stock_data['symbol']} — цена: {stock_data['price']}» )


При анализе результатов после запуска кода мы будем наблюдать в реальном времени обновления цен на акции Apple Inc., которые производятся и потребляются.

Заключение

В этом уникальном примере мы продемонстрировали реализацию потоковой передачи данных в реальном времени на Python с использованием Apache Kafka и библиотеки «yfinance» для сбора и обработки данных фондового рынка. Мы подробно объяснили каждую строчку кода. Потоковая передача данных в реальном времени может применяться в различных областях для создания реальных приложений в сфере финансов, Интернета вещей и т. д.