Real - time data processing refers to the immediate analysis and transformation of data as it is generated or received. Unlike batch processing, where data is collected over a period and then processed in one go, real - time processing operates on data in a continuous, streaming manner. This allows for timely decision - making and quick responses to events.
Pandas is a popular library for data manipulation and analysis. While it is more commonly used for batch processing, it can also be used for simple real - time data handling.
import pandas as pd
# Simulate real - time data ingestion
data_stream = [{'timestamp': '2023-10-01 10:00:00', 'value': 10},
{'timestamp': '2023-10-01 10:01:00', 'value': 15},
{'timestamp': '2023-10-01 10:02:00', 'value': 20}]
for data in data_stream:
df = pd.DataFrame([data])
# Simple processing: calculate a running average
if 'running_avg' not in locals():
running_avg = df['value'].mean()
else:
running_avg = (running_avg * (len(df) - 1)+ df['value'].values[0]) / len(df)
print(f"Running average: {running_avg}")
Kafka is a distributed streaming platform, and kafka - python
is a Python client for interacting with Kafka. It is useful for handling high - volume, real - time data streams.
from kafka import KafkaProducer, KafkaConsumer
import json
# Produce data to Kafka
producer = KafkaProducer(bootstrap_servers='localhost:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
data = {'message': 'Hello, Kafka!'}
producer.send('test_topic', value=data)
producer.flush()
# Consume data from Kafka
consumer = KafkaConsumer('test_topic',
bootstrap_servers='localhost:9092',
value_deserializer=lambda m: json.loads(m.decode('utf-8')))
for message in consumer:
print(message.value)
Apache Beam is a unified model for defining both batch and streaming data processing pipelines. The apache - beam
Python SDK allows you to build scalable real - time data processing pipelines.
import apache_beam as beam
# Define a pipeline
with beam.Pipeline() as p:
# Simulate a data source
data = p | beam.Create([1, 2, 3, 4, 5])
# Process the data
squared = data | beam.Map(lambda x: x * x)
# Print the results
squared | beam.Map(print)
import time
with open('data.log', 'r') as f:
while True:
line = f.readline()
if not line:
time.sleep(1)
continue
print(line.strip())
socket
module to establish connections and receive data streams.import socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(('localhost', 8000))
server_socket.listen(1)
print('Waiting for a connection...')
conn, addr = server_socket.accept()
print(f'Connected by {addr}')
while True:
data = conn.recv(1024)
if not data:
break
print(data.decode())
conn.close()
data = [1, 2, 3, 4, 5]
filtered_data = [x for x in data if x > 3]
print(filtered_data)
import numpy as np
data = [1, 2, 3, 4, 5]
average = np.mean(data)
print(average)
processed_data = [{'name': 'Alice', 'score': 85}, {'name': 'Bob', 'score': 90}]
with open('output.csv', 'w') as f:
f.write('name,score\n')
for item in processed_data:
f.write(f"{item['name']},{item['score']}\n")
sqlite3
or psycopg2
to store data in databases.import sqlite3
conn = sqlite3.connect('example.db')
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS scores
(name TEXT, score INTEGER)''')
data = [('Alice', 85), ('Bob', 90)]
c.executemany('INSERT INTO scores VALUES (?,?)', data)
conn.commit()
conn.close()
When dealing with real - time data, errors can occur due to network issues, data format problems, or resource limitations. It is important to implement proper error handling to ensure the stability of the data processing system.
try:
# Code for data ingestion or processing
result = 1 / 0
except ZeroDivisionError:
print('Error: Division by zero')
As the volume of data increases, the processing system needs to scale horizontally or vertically. For example, using distributed systems like Apache Kafka and Apache Beam can help handle large - scale data streams.
Keep track of the data processing system’s performance and log important events. This can help in debugging and optimizing the system.
import logging
logging.basicConfig(level=logging.INFO)
logging.info('Data processing started')
try:
# Data processing code
pass
except Exception as e:
logging.error(f'Error occurred: {e}')
logging.info('Data processing completed')
Asynchronous programming can improve the performance of real - time data processing by allowing the program to perform other tasks while waiting for I/O operations. The asyncio
library in Python can be used for this purpose.
import asyncio
async def process_data(data):
print(f'Processing data: {data}')
await asyncio.sleep(1)
print(f'Data {data} processed')
async def main():
tasks = [process_data(i) for i in range(5)]
await asyncio.gather(*tasks)
asyncio.run(main())
Since real - time data processing often deals with large volumes of data, it is important to optimize memory usage. Use generators and iterators instead of loading all data into memory at once.
# Using a generator to read a large file
def read_large_file(file_path):
with open(file_path, 'r') as f:
for line in f:
yield line
for line in read_large_file('large_file.txt'):
print(line.strip())
Protect the data being processed by following security best practices such as encryption, access control, and data masking.
Real - time data processing with Python offers a wide range of possibilities for various applications. By understanding the fundamental concepts, leveraging the right libraries, and following common and best practices, you can build efficient and reliable real - time data processing systems. Python’s simplicity and flexibility make it an ideal choice for developers looking to work with real - time data.