Ваша задача: создать консьюмера, который будет читать события из Kafka, фильтровать только события с URL-адресами, содержащими слово "product", и сохранять их в новый топик `filtered_clicks`.
Решение:
```python
from confluent_kafka import Producer, Consumer
import json
# Настройки Kafka
broker = 'localhost:9092'
# Создание продюсера для записи в новый топик
producer = Producer({'bootstrap.servers': broker})
def produce_filtered_event(event):
producer.produce('filtered_clicks', value=json.dumps(event))
producer.flush()
# Создание консьюмера для чтения из исходного топика
consumer = Consumer({
'bootstrap.servers': broker,
'group.id': 'clickstream-group',
'auto.offset.reset': 'earliest'
})
consumer.subscribe(['clickstream'])
# Чтение и фильтрация событий
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
continue
# Преобразуем сообщение из Kafka в Python-объект
event = json.loads(msg.value().decode('utf-8'))
# Фильтруем события с URL, содержащими "product"
if 'product' in event['url']:
print(f"Фильтруем событие: {event}")
produce_filtered_event(event)
except KeyboardInterrupt:
print("Завершение работы.")
finally:
consumer.close()
```
Объяснение:
– Консьюмер читает события из топика `clickstream`.
– Каждое сообщение проверяется на наличие слова "product" в поле `url`.
– Отфильтрованные события отправляются в новый топик `filtered_clicks` через продюсера.
Задача 2: Подсчет количества событий в реальном времени
Описание:
Топик `log_events` содержит логи системы. Каждое сообщение содержит:
– `log_level` (например, "INFO", "ERROR", "DEBUG").
– `message` (текст лога).
Ваша задача: написать программу, которая считает количество событий уровня "ERROR" в реальном времени и каждые 10 секунд выводит их общее количество.
Решение:
```python
from confluent_kafka import Consumer
import time
# Настройки Kafka
broker = 'localhost:9092'
# Создание консьюмера
consumer = Consumer({
'bootstrap.servers': broker,
'group.id': 'log-group',
'auto.offset.reset': 'earliest'
})
consumer.subscribe(['log_events'])
error_count = 0
start_time = time.time()
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
continue
# Преобразуем сообщение в Python-объект
log_event = json.loads(msg.value().decode('utf-8'))
# Увеличиваем счетчик, если уровень лога "ERROR"
if log_event['log_level'] == 'ERROR':
error_count += 1
# Каждые 10 секунд выводим текущий счетчик
if time.time() – start_time >= 10:
print(f"Количество ошибок за последние 10 секунд: {error_count}")
error_count = 0
start_time = time.time()
except KeyboardInterrupt:
print("Завершение работы.")
finally:
consumer.close()
```
Объяснение:
– Консьюмер читает события из топика `log_events`.
– Если уровень лога "ERROR", увеличивается счетчик `error_count`.
– Каждые 10 секунд программа выводит количество событий "ERROR" и сбрасывает счетчик.
Задача 3: Агрегация данных по группам
Описание:
Топик `transactions` содержит данные о финансовых транзакциях:
– `user_id` – идентификатор пользователя.
– `amount` – сумма транзакции.
Ваша задача: написать программу, которая подсчитывает общую сумму транзакций для каждого пользователя и выводит результаты в реальном времени.
Решение:
```python
from confluent_kafka import Consumer
import json
from collections import defaultdict
# Настройки Kafka
broker = 'localhost:9092'
# Создание консьюмера
consumer = Consumer({
'bootstrap.servers': broker,
'group.id': 'transaction-group',
'auto.offset.reset': 'earliest'
})
consumer.subscribe(['transactions'])
# Словарь для хранения сумм по пользователям
user_totals = defaultdict(float)
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
continue
# Преобразуем сообщение в Python-объект
transaction = json.loads(msg.value().decode('utf-8'))
# Обновляем сумму для пользователя
user_id = transaction['user_id']
user_totals[user_id] += transaction['amount']
# Вывод текущих сумм
print(f"Текущая сумма транзакций по пользователям: {dict(user_totals)}")
except KeyboardInterrupt:
print("Завершение работы.")
finally:
consumer.close()
```
Объяснение:
– Консьюмер читает данные из топика `transactions`.
– Для каждого пользователя обновляется сумма его транзакций в словаре `user_totals`.
– Программа выводит текущие суммы по всем пользователям.
Задача 4: Сохранение обработанных данных в файл
Описание:
Топик `sensor_data` содержит данные с датчиков IoT:
– `sensor_id` – идентификатор датчика.
– `temperature` – измеренная температура.
– `timestamp` – время измерения.
Ваша задача: написать программу, которая сохраняет все данные о температуре выше 30°C в файл `high_temp.json`.
Решение:
```python
from confluent_kafka import Consumer
import json
# Настройки Kafka
broker = 'localhost:9092'
# Создание консьюмера
consumer = Consumer({
'bootstrap.servers': broker,
'group.id': 'sensor-group',
'auto.offset.reset': 'earliest'
})
consumer.subscribe(['sensor_data'])
# Открываем файл для записи
with open('high_temp.json', 'w') as outfile:
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
continue
# Преобразуем сообщение в Python-объект
sensor_data = json.loads(msg.value().decode('utf-8'))
# Сохраняем данные, если температура выше 30°C
if sensor_data['temperature'] > 30:
json.dump(sensor_data, outfile)
outfile.write('\n') # Новый ряд для каждого объекта
except KeyboardInterrupt:
print("Завершение работы.")
finally:
consumer.close()
```
Объяснение:
– Консьюмер читает данные из топика `sensor_data`.
– Данные с температурой выше 30°C записываются в файл `high_temp.json`.
Задача 5: Обнаружение аномалий в данных
Описание:
В топик `temperature_readings` поступают данные о температуре из различных городов:
– `city` – название города.
– `temperature` – измеренная температура.
– `timestamp` – время измерения.
Ваша задача: написать программу, которая будет находить и выводить аномалии – случаи, когда температура превышает 40°C или опускается ниже -10°C.
Решение:
```python
from confluent_kafka import Consumer
import json
# Настройки Kafka
broker = 'localhost:9092'
# Создание консьюмера
consumer = Consumer({
'bootstrap.servers': broker,
'group.id': 'temperature-group',
'auto.offset.reset': 'earliest'
})
consumer.subscribe(['temperature_readings'])
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
continue
# Преобразуем сообщение в Python-объект
reading = json.loads(msg.value().decode('utf-8'))
# Проверяем на аномалии
if reading['temperature'] > 40 or reading['temperature'] < -10:
print(f"Аномалия! Город: {reading['city']}, Температура: {reading['temperature']}°C")
except KeyboardInterrupt:
print("Завершение работы.")
finally:
consumer.close()
```
Объяснение:
– Консьюмер читает данные о температуре из топика.
– Если температура выходит за пределы нормального диапазона, программа выводит сообщение об аномалии.
Задача 6: Потоковое объединение данных
Описание:
Есть два топика:
1. `orders` – содержит данные о заказах: `order_id`, `product_id`, `quantity`.
2. `products` – содержит данные о товарах: `product_id`, `product_name`, `price`.
Ваша задача: написать программу, которая объединяет данные из этих двух топиков и выводит итоговую информацию о каждом заказе, включая название продукта и общую стоимость.
Решение:
```python
from confluent_kafka import Consumer
import json
# Настройки Kafka
broker = 'localhost:9092'
# Создание консьюмеров для обоих топиков
order_consumer = Consumer({
'bootstrap.servers': broker,
'group.id': 'order-group',
'auto.offset.reset': 'earliest'
})
product_consumer = Consumer({
'bootstrap.servers': broker,
'group.id': 'product-group',
'auto.offset.reset': 'earliest'
})
order_consumer.subscribe(['orders'])
product_consumer.subscribe(['products'])
# Словарь для хранения данных о товарах
product_catalog = {}
try:
while True:
# Чтение данных из топика products
product_msg = product_consumer.poll(0.1)
if product_msg and not product_msg.error():
product = json.loads(product_msg.value().decode('utf-8'))
product_catalog[product['product_id']] = {
'name': product['product_name'],
'price': product['price']
}
# Чтение данных из топика orders
order_msg = order_consumer.poll(0.1)
if order_msg and not order_msg.error():
order = json.loads(order_msg.value().decode('utf-8'))
product_id = order['product_id']
# Объединение данных о заказе и товаре
if product_id in product_catalog:
product = product_catalog[product_id]
total_price = order['quantity'] * product['price']
print(f"Заказ {order['order_id']}: {product['name']} x {order['quantity']} = {total_price} $")
else:
print(f"Информация о товаре {product_id} отсутствует.")
except KeyboardInterrupt:
print("Завершение работы.")
finally:
order_consumer.close()
product_consumer.close()
```
Объяснение:
– Данные из топика `products` кэшируются в словаре `product_catalog`.
– При чтении заказа из топика `orders` программа объединяет данные и вычисляет итоговую стоимость.
Задача 7: Потоковая обработка с вычислением скользящего среднего
Описание:
В топик `stock_prices` поступают данные о ценах акций:
– `symbol` – тикер акции.
– `price` – текущая цена.
– `timestamp` – время.
Ваша задача: вычислять скользящее среднее цены акции за последние 5 сообщений для каждого тикера.
Решение:
```python
from confluent_kafka import Consumer
import json
from collections import defaultdict, deque
# Настройки Kafka
broker = 'localhost:9092'
# Создание консьюмера
consumer = Consumer({
'bootstrap.servers': broker,
'group.id': 'stocks-group',
'auto.offset.reset': 'earliest'
})
consumer.subscribe(['stock_prices'])
# Дек для хранения последних цен по тикерам
price_window = defaultdict(lambda: deque(maxlen=5))
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
continue
# Преобразуем сообщение в Python-объект
stock_data = json.loads(msg.value().decode('utf-8'))
# Добавляем цену в окно
symbol = stock_data['symbol']
price_window[symbol].append(stock_data['price'])
# Вычисляем скользящее среднее
moving_average = sum(price_window[symbol]) / len(price_window[symbol])
print(f"Скользящее среднее для {symbol}: {moving_average:.2f}")
except KeyboardInterrupt:
print("Завершение работы.")
finally:
consumer.close()
```
Объяснение:
– Используется `deque` для хранения последних 5 цен.
– Скользящее среднее вычисляется как сумма значений, делённая на их количество.
Задача 8: Генерация уведомлений
Описание:
В топик `user_actions` поступают данные о действиях пользователей:
– `user_id` – идентификатор пользователя.
– `action` – выполненное действие (например, "login", "purchase").
Напишите программу, которая отслеживает пользователей, выполнивших вход (`login`), но не совершивших покупку (`purchase`) в течение 10 минут, и отправляет уведомление в топик `notifications`.
Решение:
```python
from confluent_kafka import Consumer, Producer
import json
from datetime import datetime, timedelta
# Настройки Kafka
broker = 'localhost:9092'
# Создание консьюмера
consumer = Consumer({
'bootstrap.servers': broker,
'group.id': 'user-actions-group',
'auto.offset.reset': 'earliest'
})
producer = Producer({'bootstrap.servers': broker})
consumer.subscribe(['user_actions'])
# Словарь для отслеживания пользователей
user_login_time = {}
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
continue
# Преобразуем сообщение в Python-объект
action = json.loads(msg.value().decode('utf-8'))
user_id = action['user_id']
action_type = action['action']
timestamp = datetime.fromisoformat(action['timestamp'])
if action_type == 'login':
user_login_time[user_id] = timestamp
elif action_type == 'purchase' and user_id in user_login_time:
del user_login_time[user_id]
# Проверяем, прошло ли 10 минут
current_time = datetime.now()
for user, login_time in list(user_login_time.items()):
if current_time – login_time > timedelta(minutes=10):
notification = {'user_id': user, 'message': 'Сделайте покупку!'}
producer.produce('notifications', value=json.dumps(notification))
print(f"Уведомление отправлено для пользователя {user}")
del user_login_time[user]
except KeyboardInterrupt:
print("Завершение работы.")
finally:
consumer.close()
```
Объяснение:
– Время входа пользователей сохраняется в словаре.
– Если с момента входа прошло более 10 минут и покупка не совершена, генерируется уведомление.
Эти задачи показывают, как использовать Apache Kafka для решения реальных задач, таких как фильтрация событий, подсчет статистики, агрегация данных и сохранение обработанной информации. Эти примеры помогут вам освоить основные подходы к работе с потоками данных в реальном времени.
SQLAlchemy – это мощная библиотека для работы с базами данных в Python. Она предоставляет инструменты для удобного взаимодействия с реляционными базами данных через ORM (Object Relational Mapping) или с использованием чистого SQL.
Pandas же идеально подходит для анализа данных, но иногда данные, которые мы хотим обработать, хранятся в базах данных. Для этого SQLAlchemy и Pandas можно эффективно интегрировать, чтобы выгружать данные из базы, обрабатывать их в Pandas и сохранять обратно.
Установка и подключение
Для начала работы установите библиотеку SQLAlchemy:
```bash
pip install sqlalchemy
```
Если вы используете SQLite, дополнительных действий не требуется. Для других баз данных, таких как PostgreSQL или MySQL, также потребуется установить драйверы, например:
```bash
pip install psycopg2 # Для PostgreSQL
pip install pymysql # Для MySQL
```
Создайте подключение к базе данных с помощью SQLAlchemy. Например, для SQLite это будет выглядеть так:
```python
from sqlalchemy import create_engine
# Создаем подключение к базе данных SQLite
engine = create_engine('sqlite:///example.db', echo=True)
```
Здесь `echo=True` означает, что в консоль будут выводиться SQL-запросы, выполняемые через SQLAlchemy, что полезно для отладки.
Создание таблиц и работа с ORM
SQLAlchemy поддерживает два основных подхода: работа через ORM и использование SQL-запросов напрямую. Рассмотрим оба.
Создадим таблицу для хранения информации о пользователях:
```python
from sqlalchemy import Table, Column, Integer, String, MetaData
# Создаем метаданные
metadata = MetaData()
# Определяем таблицу
users = Table(
'users', metadata,
Column('id', Integer, primary_key=True),
Column('name', String),
Column('age', Integer),
Column('email', String)
)
# Создаем таблицу в базе данных
metadata.create_all(engine)
```
Теперь таблица `users` создана в базе данных.
Для добавления данных используем объект подключения:
```python
from sqlalchemy import insert
# Подключаемся к базе данных
conn = engine.connect()
# Добавляем данные
insert_query = insert(users).values([
{'name': 'Alice', 'age': 25, 'email': 'alice@example.com'},
{'name': 'Bob', 'age': 30, 'email': 'bob@example.com'},
{'name': 'Charlie', 'age': 35, 'email': 'charlie@example.com'}
])
conn.execute(insert_query)
print("Данные добавлены в таблицу.")
```
Чтение данных и интеграция с Pandas
Чтобы выгрузить данные из базы данных в Pandas, SQLAlchemy предоставляет удобный метод. Используем Pandas для выполнения SQL-запроса:
```python
import pandas as pd
# Чтение данных из таблицы users
query = "SELECT * FROM users"
df = pd.read_sql(query, engine)
print(df)
```
Вывод будет выглядеть так:
```
id name age email
0 1 Alice 25 alice@example.com
1 2 Bob 30 bob@example.com
2 3 Charlie 35 charlie@example.com
```
Теперь данные из базы данных доступны в формате DataFrame, и вы можете применять к ним все мощные инструменты анализа, которые предоставляет Pandas.
Обработка данных с использованием Pandas
Допустим, мы хотим найти всех пользователей старше 30 лет и добавить новый столбец с доменом их электронной почты.
```python
# Фильтрация пользователей старше 30 лет
filtered_df = df[df['age'] > 30]
# Добавление нового столбца с доменом электронной почты
filtered_df['email_domain'] = filtered_df['email'].apply(lambda x: x.split('@')[1])
print(filtered_df)
```
Результат будет выглядеть так:
```
id name age email email_domain
2 3 Charlie 35 charlie@example.com example.com
```
Сохранение данных обратно в базу
После обработки данных в Pandas мы можем сохранить их обратно в базу данных. Для этого Pandas предоставляет метод `to_sql`:
```python
# Сохранение отфильтрованных данных в новую таблицу filtered_users
filtered_df.to_sql('filtered_users', engine, if_exists='replace', index=False)
print("Данные сохранены в таблицу filtered_users.")
```
Теперь в базе данных появилась новая таблица `filtered_users`, содержащая обработанные данные.
Работа с ORM
Для более сложных сценариев SQLAlchemy поддерживает ORM, позволяющий работать с таблицами как с Python-классами.
Определим класс для таблицы `users`:
```python
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String)
age = Column(Integer)
email = Column(String)
# Создаем сессию для работы с ORM
Session = sessionmaker(bind=engine)
session = Session()
О проекте
О подписке