Raspberry Pi Serial Communication
Complete guide to PySerial on Raspberry Pi - GPIO UART, USB serial, and hardware-specific configuration for embedded projects
Master serial communication on Raspberry Pi using built-in UART, USB adapters, and GPIO pins for embedded projects.
Hardware Setup
Raspberry Pi has multiple UART interfaces - choose the right one for your project needs.
import serial
import RPi.GPIO as GPIO
import time
# Raspberry Pi GPIO UART Configuration
# GPIO 14 (TXD) - Pin 8
# GPIO 15 (RXD) - Pin 10
# Ground - Pin 6
class RaspberryPiSerial:
def __init__(self, device='/dev/serial0', baudrate=9600):
"""
/dev/serial0 - Primary UART (GPIO 14/15)
/dev/ttyAMA0 - Hardware UART (Pi 3/4)
/dev/ttyS0 - Mini UART (Pi 3/4)
"""
self.device = device
self.baudrate = baudrate
self.serial = None
def connect(self):
"""Connect to Raspberry Pi UART"""
try:
self.serial = serial.Serial(
port=self.device,
baudrate=self.baudrate,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
timeout=1
)
return True
except serial.SerialException as e:
print(f"Failed to connect: {e}")
return False
def send_command(self, command):
"""Send command and read response"""
if not self.serial:
return None
self.serial.write(command.encode() + b'\r\n')
time.sleep(0.1)
response = b''
while self.serial.in_waiting:
response += self.serial.read(self.serial.in_waiting)
time.sleep(0.1)
return response.decode().strip()
def close(self):
if self.serial:
self.serial.close()
# Basic usage
pi_serial = RaspberryPiSerial('/dev/serial0', 115200)
if pi_serial.connect():
response = pi_serial.send_command('AT')
print(f"Response: {response}")
pi_serial.close()
Enable UART on Raspberry Pi:
# Edit /boot/config.txt
sudo nano /boot/config.txt
# Add these lines:
enable_uart=1
dtoverlay=disable-bt
# Disable serial console (if needed)
sudo systemctl disable serial-getty@ttyAMA0.service
sudo systemctl disable serial-getty@serial0.service
# Reboot
sudo reboot
import serial
import serial.tools.list_ports
import time
class USBSerialManager:
def __init__(self):
self.connections = {}
def list_usb_devices(self):
"""List all USB serial devices"""
ports = serial.tools.list_ports.comports()
usb_ports = []
for port in ports:
if 'USB' in port.description or 'ttyUSB' in port.device:
usb_ports.append({
'device': port.device,
'description': port.description,
'vid': port.vid,
'pid': port.pid,
'serial_number': port.serial_number
})
return usb_ports
def connect_device(self, device_path, baudrate=9600, name=None):
"""Connect to USB serial device"""
if name is None:
name = device_path
try:
ser = serial.Serial(
port=device_path,
baudrate=baudrate,
timeout=1
)
self.connections[name] = ser
print(f"Connected to {device_path} as {name}")
return True
except serial.SerialException as e:
print(f"Failed to connect to {device_path}: {e}")
return False
def send_to_device(self, name, data):
"""Send data to specific device"""
if name not in self.connections:
return False
try:
self.connections[name].write(data.encode())
return True
except:
return False
def read_from_device(self, name, timeout=1):
"""Read from specific device"""
if name not in self.connections:
return None
ser = self.connections[name]
ser.timeout = timeout
data = b''
start_time = time.time()
while time.time() - start_time < timeout:
if ser.in_waiting:
data += ser.read(ser.in_waiting)
time.sleep(0.01)
return data.decode() if data else None
def close_all(self):
"""Close all connections"""
for name, ser in self.connections.items():
ser.close()
self.connections.clear()
# Usage example
manager = USBSerialManager()
# List available devices
devices = manager.list_usb_devices()
print("Available USB devices:")
for device in devices:
print(f" {device['device']}: {device['description']}")
# Connect to devices
if devices:
device_path = devices[0]['device']
if manager.connect_device(device_path, 115200, 'gps'):
manager.send_to_device('gps', 'AT\r\n')
response = manager.read_from_device('gps', timeout=2)
print(f"GPS response: {response}")
manager.close_all()
import serial
import struct
import time
from datetime import datetime
class HardwareUART:
"""Hardware UART interface for high-speed communication"""
def __init__(self, device='/dev/ttyAMA0', baudrate=115200):
self.device = device
self.baudrate = baudrate
self.serial = None
def setup_hardware_uart(self):
"""Configure hardware UART with optimal settings"""
try:
self.serial = serial.Serial(
port=self.device,
baudrate=self.baudrate,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
timeout=0.1,
write_timeout=0.1,
# Hardware flow control
rtscts=True,
dsrdtr=False
)
# Configure buffers for high throughput
self.serial.set_buffer_size(rx_size=4096, tx_size=4096)
return True
except serial.SerialException as e:
print(f"UART setup failed: {e}")
return False
def high_speed_data_logger(self, duration_seconds=10):
"""High-speed data logging example"""
if not self.serial:
return
print(f"Starting high-speed logging for {duration_seconds} seconds...")
data_log = []
start_time = time.time()
bytes_received = 0
while time.time() - start_time < duration_seconds:
if self.serial.in_waiting:
chunk = self.serial.read(self.serial.in_waiting)
timestamp = datetime.now()
data_log.append({
'timestamp': timestamp,
'data': chunk,
'size': len(chunk)
})
bytes_received += len(chunk)
# Calculate throughput
elapsed = time.time() - start_time
throughput = bytes_received / elapsed
print(f"Logging complete:")
print(f" Duration: {elapsed:.2f} seconds")
print(f" Bytes received: {bytes_received:,}")
print(f" Throughput: {throughput:.0f} bytes/second")
print(f" Data chunks: {len(data_log)}")
return data_log
def binary_protocol_handler(self):
"""Handle binary protocol communication"""
if not self.serial:
return
# Example: Custom binary protocol
# Header: [0xAA, 0xBB, LENGTH, CMD]
# Data: [DATA...]
# Checksum: [CRC16]
def send_binary_command(cmd, data=b''):
header = struct.pack('>BBBB', 0xAA, 0xBB, len(data), cmd)
# Calculate CRC16 (simplified)
crc = sum(header + data) & 0xFFFF
crc_bytes = struct.pack('>H', crc)
packet = header + data + crc_bytes
self.serial.write(packet)
def read_binary_response():
# Read header
header = self.serial.read(4)
if len(header) != 4 or header[:2] != b'\xAA\xBB':
return None
length = header[2]
cmd = header[3]
# Read data and checksum
payload = self.serial.read(length + 2)
if len(payload) != length + 2:
return None
data = payload[:-2]
received_crc = struct.unpack('>H', payload[-2:])[0]
# Verify checksum
calculated_crc = sum(header + data) & 0xFFFF
if received_crc != calculated_crc:
return None
return {
'cmd': cmd,
'data': data,
'length': length
}
# Example usage
send_binary_command(0x01, b'Hello Pi') # Send command 0x01
response = read_binary_response()
if response:
print(f"Received command {response['cmd']}: {response['data']}")
def close(self):
if self.serial:
self.serial.close()
# Configure Raspberry Pi for hardware UART
# Add to /boot/config.txt:
# dtoverlay=uart2
# dtoverlay=uart3
# dtoverlay=uart4
# dtoverlay=uart5
# Usage
uart = HardwareUART('/dev/ttyAMA0', 230400)
if uart.setup_hardware_uart():
# Run high-speed data logging
log_data = uart.high_speed_data_logger(5)
# Handle binary protocol
uart.binary_protocol_handler()
uart.close()
GPIO Integration
Combine serial communication with GPIO control for complete hardware integration.
import serial
import RPi.GPIO as GPIO
import time
import threading
from enum import Enum
class PinMode(Enum):
INPUT = GPIO.IN
OUTPUT = GPIO.OUT
class RaspberryPiController:
def __init__(self, serial_device='/dev/serial0', baudrate=9600):
# Setup GPIO
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
# Serial setup
self.serial = serial.Serial(serial_device, baudrate, timeout=1)
# Pin configurations
self.pin_configs = {}
self.pwm_objects = {}
# Control flags
self.running = False
self.monitor_thread = None
def setup_pin(self, pin, mode, initial_value=None, pull_up_down=None):
"""Configure GPIO pin"""
self.pin_configs[pin] = mode
if mode == PinMode.OUTPUT:
GPIO.setup(pin, GPIO.OUT)
if initial_value is not None:
GPIO.output(pin, initial_value)
else:
pull = GPIO.PUD_OFF
if pull_up_down == 'up':
pull = GPIO.PUD_UP
elif pull_up_down == 'down':
pull = GPIO.PUD_DOWN
GPIO.setup(pin, GPIO.IN, pull_up_down=pull)
def setup_pwm(self, pin, frequency=1000):
"""Setup PWM on pin"""
self.setup_pin(pin, PinMode.OUTPUT)
pwm = GPIO.PWM(pin, frequency)
self.pwm_objects[pin] = pwm
pwm.start(0) # Start with 0% duty cycle
return pwm
def digital_write(self, pin, value):
"""Write digital value to pin"""
if pin in self.pin_configs and self.pin_configs[pin] == PinMode.OUTPUT:
GPIO.output(pin, value)
def digital_read(self, pin):
"""Read digital value from pin"""
if pin in self.pin_configs and self.pin_configs[pin] == PinMode.INPUT:
return GPIO.input(pin)
return None
def pwm_write(self, pin, duty_cycle):
"""Set PWM duty cycle (0-100)"""
if pin in self.pwm_objects:
self.pwm_objects[pin].ChangeDutyCycle(duty_cycle)
def serial_gpio_bridge(self):
"""Bridge serial commands to GPIO operations"""
command_map = {
b'LED_ON': lambda: self.digital_write(18, GPIO.HIGH),
b'LED_OFF': lambda: self.digital_write(18, GPIO.LOW),
b'READ_SENSOR': lambda: self.digital_read(24),
b'PWM_50': lambda: self.pwm_write(12, 50),
b'PWM_100': lambda: self.pwm_write(12, 100)
}
while self.running:
if self.serial.in_waiting:
command = self.serial.readline().strip()
if command in command_map:
result = command_map[command]()
# Send response back
if result is not None:
response = f"RESULT:{result}\n"
self.serial.write(response.encode())
else:
self.serial.write(b"OK\n")
else:
self.serial.write(b"UNKNOWN_COMMAND\n")
time.sleep(0.01)
def sensor_data_logger(self, sensors, log_interval=1.0):
"""Log sensor data via serial"""
def logging_worker():
while self.running:
readings = {}
timestamp = time.time()
for sensor_name, pin in sensors.items():
readings[sensor_name] = self.digital_read(pin)
# Format as CSV
log_entry = f"{timestamp:.3f}"
for sensor_name, value in readings.items():
log_entry += f",{sensor_name}:{value}"
log_entry += "\n"
self.serial.write(log_entry.encode())
time.sleep(log_interval)
self.monitor_thread = threading.Thread(target=logging_worker)
self.monitor_thread.daemon = True
self.monitor_thread.start()
def start(self):
"""Start controller operations"""
self.running = True
# Setup common pins
self.setup_pin(18, PinMode.OUTPUT) # LED
self.setup_pin(24, PinMode.INPUT, pull_up_down='up') # Button/Sensor
self.setup_pwm(12, 1000) # PWM output
# Start serial-GPIO bridge
bridge_thread = threading.Thread(target=self.serial_gpio_bridge)
bridge_thread.daemon = True
bridge_thread.start()
print("Raspberry Pi controller started")
def cleanup(self):
"""Clean up resources"""
self.running = False
if self.monitor_thread:
self.monitor_thread.join()
# Stop PWM
for pwm in self.pwm_objects.values():
pwm.stop()
# Cleanup GPIO
GPIO.cleanup()
# Close serial
self.serial.close()
print("Cleanup complete")
# Usage example
controller = RaspberryPiController('/dev/serial0', 115200)
controller.start()
try:
# Define sensors to monitor
sensors = {
'temperature': 24,
'motion': 25,
'light': 23
}
# Start sensor logging
controller.sensor_data_logger(sensors, log_interval=0.5)
# Manual control examples
controller.digital_write(18, GPIO.HIGH) # Turn on LED
time.sleep(1)
controller.pwm_write(12, 75) # Set PWM to 75%
# Keep running
input("Press Enter to stop...")
finally:
controller.cleanup()
IoT and Cloud Integration
MQTT Bridge
import serial
import json
import time
import threading
import paho.mqtt.client as mqtt
from datetime import datetime
class SerialMQTTBridge:
def __init__(self, serial_port, baudrate, mqtt_broker, mqtt_port=1883):
# Serial setup
self.serial = serial.Serial(serial_port, baudrate, timeout=1)
# MQTT setup
self.mqtt_client = mqtt.Client()
self.mqtt_broker = mqtt_broker
self.mqtt_port = mqtt_port
# Topics
self.data_topic = "rpi/serial/data"
self.command_topic = "rpi/serial/command"
self.status_topic = "rpi/serial/status"
# State
self.running = False
self.device_id = f"rpi-{int(time.time())}"
def setup_mqtt(self):
"""Configure MQTT client"""
def on_connect(client, userdata, flags, rc):
print(f"MQTT connected with code {rc}")
client.subscribe(self.command_topic)
# Publish online status
status_msg = {
'device_id': self.device_id,
'status': 'online',
'timestamp': datetime.now().isoformat()
}
client.publish(self.status_topic, json.dumps(status_msg))
def on_message(client, userdata, msg):
"""Handle incoming MQTT commands"""
try:
command = json.loads(msg.payload.decode())
self.handle_mqtt_command(command)
except json.JSONDecodeError:
# Treat as raw command
self.serial.write(msg.payload + b'\n')
self.mqtt_client.on_connect = on_connect
self.mqtt_client.on_message = on_message
# Set last will (offline status)
offline_msg = {
'device_id': self.device_id,
'status': 'offline',
'timestamp': datetime.now().isoformat()
}
self.mqtt_client.will_set(
self.status_topic,
json.dumps(offline_msg),
retain=True
)
def handle_mqtt_command(self, command):
"""Process structured MQTT command"""
cmd_type = command.get('type')
if cmd_type == 'raw':
# Send raw data to serial
data = command.get('data', '')
self.serial.write(data.encode() + b'\n')
elif cmd_type == 'gpio':
# GPIO control command
pin = command.get('pin')
action = command.get('action')
value = command.get('value')
gpio_command = f"GPIO {pin} {action} {value}\n"
self.serial.write(gpio_command.encode())
elif cmd_type == 'config':
# Configuration command
setting = command.get('setting')
value = command.get('value')
config_command = f"CONFIG {setting} {value}\n"
self.serial.write(config_command.encode())
def serial_to_mqtt_worker(self):
"""Forward serial data to MQTT"""
buffer = b''
while self.running:
if self.serial.in_waiting:
data = self.serial.read(self.serial.in_waiting)
buffer += data
# Process complete lines
while b'\n' in buffer:
line, buffer = buffer.split(b'\n', 1)
line_str = line.decode('utf-8', errors='ignore').strip()
if line_str:
# Create structured message
message = {
'device_id': self.device_id,
'timestamp': datetime.now().isoformat(),
'data': line_str,
'raw': True
}
# Try to parse as JSON for structured data
try:
parsed_data = json.loads(line_str)
message['data'] = parsed_data
message['raw'] = False
except json.JSONDecodeError:
pass
# Publish to MQTT
self.mqtt_client.publish(
self.data_topic,
json.dumps(message)
)
time.sleep(0.01)
def start(self):
"""Start the bridge"""
self.setup_mqtt()
# Connect to MQTT broker
self.mqtt_client.connect(self.mqtt_broker, self.mqtt_port, 60)
self.mqtt_client.loop_start()
# Start serial reader
self.running = True
self.serial_thread = threading.Thread(target=self.serial_to_mqtt_worker)
self.serial_thread.daemon = True
self.serial_thread.start()
print(f"Serial-MQTT bridge started (Device ID: {self.device_id})")
def stop(self):
"""Stop the bridge"""
self.running = False
# Publish offline status
offline_msg = {
'device_id': self.device_id,
'status': 'offline',
'timestamp': datetime.now().isoformat()
}
self.mqtt_client.publish(self.status_topic, json.dumps(offline_msg))
# Cleanup
if hasattr(self, 'serial_thread'):
self.serial_thread.join()
self.mqtt_client.loop_stop()
self.mqtt_client.disconnect()
self.serial.close()
print("Bridge stopped")
# Usage
bridge = SerialMQTTBridge('/dev/serial0', 115200, 'localhost')
bridge.start()
try:
# Keep bridge running
while True:
time.sleep(1)
except KeyboardInterrupt:
bridge.stop()
REST API Server
import serial
import json
import threading
import time
from flask import Flask, request, jsonify
from datetime import datetime
from collections import deque
app = Flask(__name__)
class SerialAPIServer:
def __init__(self, serial_port, baudrate):
self.serial = serial.Serial(serial_port, baudrate, timeout=1)
self.data_buffer = deque(maxlen=1000) # Store last 1000 messages
self.running = False
self.lock = threading.Lock()
def start_serial_reader(self):
"""Background thread to read serial data"""
def reader():
buffer = b''
while self.running:
if self.serial.in_waiting:
data = self.serial.read(self.serial.in_waiting)
buffer += data
while b'\n' in buffer:
line, buffer = buffer.split(b'\n', 1)
line_str = line.decode('utf-8', errors='ignore').strip()
if line_str:
with self.lock:
self.data_buffer.append({
'timestamp': datetime.now().isoformat(),
'data': line_str
})
time.sleep(0.01)
self.running = True
self.reader_thread = threading.Thread(target=reader)
self.reader_thread.daemon = True
self.reader_thread.start()
def send_command(self, command):
"""Send command to serial device"""
try:
self.serial.write(command.encode() + b'\n')
return True
except:
return False
def get_recent_data(self, count=10):
"""Get recent data messages"""
with self.lock:
return list(self.data_buffer)[-count:]
def stop(self):
self.running = False
if hasattr(self, 'reader_thread'):
self.reader_thread.join()
self.serial.close()
# Initialize serial API
serial_api = SerialAPIServer('/dev/serial0', 115200)
serial_api.start_serial_reader()
@app.route('/api/serial/send', methods=['POST'])
def send_serial_command():
"""Send command to serial device"""
data = request.get_json()
command = data.get('command', '')
if serial_api.send_command(command):
return jsonify({'status': 'success', 'command': command})
else:
return jsonify({'status': 'error', 'message': 'Failed to send command'}), 500
@app.route('/api/serial/data', methods=['GET'])
def get_serial_data():
"""Get recent serial data"""
count = request.args.get('count', 10, type=int)
data = serial_api.get_recent_data(count)
return jsonify({'status': 'success', 'data': data})
@app.route('/api/serial/stream')
def stream_serial_data():
"""Server-sent events stream of serial data"""
def generate():
last_count = 0
while True:
current_data = serial_api.get_recent_data(1000)
if len(current_data) > last_count:
new_data = current_data[last_count:]
for item in new_data:
yield f"data: {json.dumps(item)}\n\n"
last_count = len(current_data)
time.sleep(0.1)
return app.response_class(
generate(),
mimetype='text/event-stream'
)
@app.route('/api/system/info')
def system_info():
"""Get system information"""
import psutil
import os
return jsonify({
'raspberry_pi': {
'model': open('/proc/device-tree/model', 'rb').read().decode().strip('\x00'),
'serial_number': open('/proc/cpuinfo').read().split('Serial\t\t:')[-1].strip()[:16],
'cpu_temp': float(open('/sys/class/thermal/thermal_zone0/temp').read()) / 1000,
'uptime': open('/proc/uptime').read().split()[0]
},
'system': {
'cpu_percent': psutil.cpu_percent(),
'memory_percent': psutil.virtual_memory().percent,
'disk_percent': psutil.disk_usage('/').percent
}
})
if __name__ == '__main__':
try:
app.run(host='0.0.0.0', port=5000, debug=False)
finally:
serial_api.stop()
Data Logging and Analysis
import serial
import sqlite3
import json
import threading
import time
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime, timedelta
class SerialDataLogger:
def __init__(self, serial_port, baudrate, db_path='serial_data.db'):
self.serial = serial.Serial(serial_port, baudrate, timeout=1)
self.db_path = db_path
self.running = False
# Initialize database
self.init_database()
def init_database(self):
"""Initialize SQLite database"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS serial_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
raw_data TEXT NOT NULL,
parsed_data TEXT,
data_type TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS sensor_readings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
sensor_type TEXT NOT NULL,
value REAL NOT NULL,
unit TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
conn.close()
def parse_sensor_data(self, raw_data):
"""Parse common sensor data formats"""
try:
# Try JSON format
data = json.loads(raw_data)
return data, 'json'
except json.JSONDecodeError:
pass
# Try CSV format: sensor_type,value,unit
parts = raw_data.split(',')
if len(parts) == 3:
return {
'sensor_type': parts[0].strip(),
'value': float(parts[1].strip()),
'unit': parts[2].strip()
}, 'csv'
# Try key-value format: temp=25.5C
if '=' in raw_data:
pairs = {}
for pair in raw_data.split(','):
if '=' in pair:
key, value = pair.split('=', 1)
pairs[key.strip()] = value.strip()
return pairs, 'keyvalue'
return None, 'unknown'
def log_data(self, raw_data):
"""Log data to database"""
timestamp = datetime.now().isoformat()
parsed_data, data_type = self.parse_sensor_data(raw_data)
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Log raw data
cursor.execute('''
INSERT INTO serial_data (timestamp, raw_data, parsed_data, data_type)
VALUES (?, ?, ?, ?)
''', (timestamp, raw_data, json.dumps(parsed_data) if parsed_data else None, data_type))
# Log parsed sensor readings
if parsed_data and isinstance(parsed_data, dict):
if 'sensor_type' in parsed_data and 'value' in parsed_data:
cursor.execute('''
INSERT INTO sensor_readings (timestamp, sensor_type, value, unit)
VALUES (?, ?, ?, ?)
''', (
timestamp,
parsed_data['sensor_type'],
parsed_data['value'],
parsed_data.get('unit', '')
))
else:
# Handle multiple sensors in one message
for key, value in parsed_data.items():
try:
numeric_value = float(str(value).rstrip('C°F%'))
unit = str(value)[-1] if str(value)[-1] in 'C°F%' else ''
cursor.execute('''
INSERT INTO sensor_readings (timestamp, sensor_type, value, unit)
VALUES (?, ?, ?, ?)
''', (timestamp, key, numeric_value, unit))
except ValueError:
pass # Skip non-numeric values
conn.commit()
conn.close()
def data_logger_worker(self):
"""Background worker for logging data"""
buffer = b''
while self.running:
if self.serial.in_waiting:
data = self.serial.read(self.serial.in_waiting)
buffer += data
while b'\n' in buffer:
line, buffer = buffer.split(b'\n', 1)
line_str = line.decode('utf-8', errors='ignore').strip()
if line_str:
self.log_data(line_str)
print(f"Logged: {line_str}")
time.sleep(0.01)
def start_logging(self):
"""Start data logging"""
self.running = True
self.logger_thread = threading.Thread(target=self.data_logger_worker)
self.logger_thread.daemon = True
self.logger_thread.start()
print("Data logging started")
def stop_logging(self):
"""Stop data logging"""
self.running = False
if hasattr(self, 'logger_thread'):
self.logger_thread.join()
self.serial.close()
print("Data logging stopped")
def generate_report(self, hours=24):
"""Generate analysis report"""
end_time = datetime.now()
start_time = end_time - timedelta(hours=hours)
conn = sqlite3.connect(self.db_path)
# Get sensor data
df = pd.read_sql_query('''
SELECT timestamp, sensor_type, value, unit
FROM sensor_readings
WHERE timestamp >= ? AND timestamp <= ?
ORDER BY timestamp
''', conn, params=(start_time.isoformat(), end_time.isoformat()))
if df.empty:
print("No data found for the specified time range")
return
# Convert timestamp to datetime
df['timestamp'] = pd.to_datetime(df['timestamp'])
# Generate plots
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
fig.suptitle(f'Sensor Data Analysis - Last {hours} Hours')
# Plot 1: Temperature over time
temp_data = df[df['sensor_type'] == 'temperature']
if not temp_data.empty:
axes[0, 0].plot(temp_data['timestamp'], temp_data['value'])
axes[0, 0].set_title('Temperature Over Time')
axes[0, 0].set_ylabel('Temperature (°C)')
axes[0, 0].tick_params(axis='x', rotation=45)
# Plot 2: Data distribution
sensor_counts = df['sensor_type'].value_counts()
axes[0, 1].pie(sensor_counts.values, labels=sensor_counts.index, autopct='%1.1f%%')
axes[0, 1].set_title('Sensor Data Distribution')
# Plot 3: Value ranges by sensor
df.boxplot(column='value', by='sensor_type', ax=axes[1, 0])
axes[1, 0].set_title('Value Ranges by Sensor Type')
axes[1, 0].set_xlabel('Sensor Type')
# Plot 4: Data frequency over time
hourly_counts = df.set_index('timestamp').resample('H').size()
axes[1, 1].bar(range(len(hourly_counts)), hourly_counts.values)
axes[1, 1].set_title('Data Points Per Hour')
axes[1, 1].set_ylabel('Count')
plt.tight_layout()
plt.savefig('sensor_report.png', dpi=300, bbox_inches='tight')
plt.show()
# Print statistics
print("\n=== Data Summary ===")
print(f"Total data points: {len(df)}")
print(f"Unique sensors: {df['sensor_type'].nunique()}")
print(f"Time range: {df['timestamp'].min()} to {df['timestamp'].max()}")
print("\n=== Sensor Statistics ===")
for sensor_type in df['sensor_type'].unique():
sensor_data = df[df['sensor_type'] == sensor_type]
print(f"\n{sensor_type.upper()}:")
print(f" Count: {len(sensor_data)}")
print(f" Min: {sensor_data['value'].min():.2f}")
print(f" Max: {sensor_data['value'].max():.2f}")
print(f" Mean: {sensor_data['value'].mean():.2f}")
print(f" Std: {sensor_data['value'].std():.2f}")
conn.close()
# Usage example
logger = SerialDataLogger('/dev/serial0', 115200)
logger.start_logging()
try:
# Let it run for a while
time.sleep(60) # Log for 1 minute
# Generate report
logger.generate_report(hours=1)
except KeyboardInterrupt:
pass
finally:
logger.stop_logging()
GPIO UART
Use built-in GPIO pins for direct serial communication
USB Serial
Connect multiple USB serial devices simultaneously
IoT Integration
Bridge serial data to cloud services and APIs
Data Logging
Store and analyze sensor data with SQLite and pandas
Raspberry Pi provides excellent serial communication capabilities for IoT projects. Combine GPIO control with serial interfaces to create powerful embedded applications.
How is this guide?