PySerial
PySerialDocs

Raspberry Pi Serial Communication

Complete guide to PySerial on Raspberry Pi - GPIO UART, USB serial, and hardware-specific configuration for embedded projects

Master serial communication on Raspberry Pi using built-in UART, USB adapters, and GPIO pins for embedded projects.

Hardware Setup

Raspberry Pi has multiple UART interfaces - choose the right one for your project needs.

import serial
import RPi.GPIO as GPIO
import time

# Raspberry Pi GPIO UART Configuration
# GPIO 14 (TXD) - Pin 8
# GPIO 15 (RXD) - Pin 10
# Ground      - Pin 6

class RaspberryPiSerial:
    def __init__(self, device='/dev/serial0', baudrate=9600):
        """
        /dev/serial0 - Primary UART (GPIO 14/15)
        /dev/ttyAMA0 - Hardware UART (Pi 3/4)
        /dev/ttyS0 - Mini UART (Pi 3/4)
        """
        self.device = device
        self.baudrate = baudrate
        self.serial = None
        
    def connect(self):
        """Connect to Raspberry Pi UART"""
        try:
            self.serial = serial.Serial(
                port=self.device,
                baudrate=self.baudrate,
                parity=serial.PARITY_NONE,
                stopbits=serial.STOPBITS_ONE,
                bytesize=serial.EIGHTBITS,
                timeout=1
            )
            return True
        except serial.SerialException as e:
            print(f"Failed to connect: {e}")
            return False
            
    def send_command(self, command):
        """Send command and read response"""
        if not self.serial:
            return None
            
        self.serial.write(command.encode() + b'\r\n')
        time.sleep(0.1)
        
        response = b''
        while self.serial.in_waiting:
            response += self.serial.read(self.serial.in_waiting)
            time.sleep(0.1)
            
        return response.decode().strip()
        
    def close(self):
        if self.serial:
            self.serial.close()

# Basic usage
pi_serial = RaspberryPiSerial('/dev/serial0', 115200)
if pi_serial.connect():
    response = pi_serial.send_command('AT')
    print(f"Response: {response}")
    pi_serial.close()

Enable UART on Raspberry Pi:

# Edit /boot/config.txt
sudo nano /boot/config.txt

# Add these lines:
enable_uart=1
dtoverlay=disable-bt

# Disable serial console (if needed)
sudo systemctl disable serial-getty@ttyAMA0.service
sudo systemctl disable serial-getty@serial0.service

# Reboot
sudo reboot
import serial
import serial.tools.list_ports
import time

class USBSerialManager:
    def __init__(self):
        self.connections = {}
        
    def list_usb_devices(self):
        """List all USB serial devices"""
        ports = serial.tools.list_ports.comports()
        usb_ports = []
        
        for port in ports:
            if 'USB' in port.description or 'ttyUSB' in port.device:
                usb_ports.append({
                    'device': port.device,
                    'description': port.description,
                    'vid': port.vid,
                    'pid': port.pid,
                    'serial_number': port.serial_number
                })
                
        return usb_ports
        
    def connect_device(self, device_path, baudrate=9600, name=None):
        """Connect to USB serial device"""
        if name is None:
            name = device_path
            
        try:
            ser = serial.Serial(
                port=device_path,
                baudrate=baudrate,
                timeout=1
            )
            self.connections[name] = ser
            print(f"Connected to {device_path} as {name}")
            return True
            
        except serial.SerialException as e:
            print(f"Failed to connect to {device_path}: {e}")
            return False
            
    def send_to_device(self, name, data):
        """Send data to specific device"""
        if name not in self.connections:
            return False
            
        try:
            self.connections[name].write(data.encode())
            return True
        except:
            return False
            
    def read_from_device(self, name, timeout=1):
        """Read from specific device"""
        if name not in self.connections:
            return None
            
        ser = self.connections[name]
        ser.timeout = timeout
        
        data = b''
        start_time = time.time()
        
        while time.time() - start_time < timeout:
            if ser.in_waiting:
                data += ser.read(ser.in_waiting)
            time.sleep(0.01)
            
        return data.decode() if data else None
        
    def close_all(self):
        """Close all connections"""
        for name, ser in self.connections.items():
            ser.close()
        self.connections.clear()

# Usage example
manager = USBSerialManager()

# List available devices
devices = manager.list_usb_devices()
print("Available USB devices:")
for device in devices:
    print(f"  {device['device']}: {device['description']}")

# Connect to devices
if devices:
    device_path = devices[0]['device']
    if manager.connect_device(device_path, 115200, 'gps'):
        manager.send_to_device('gps', 'AT\r\n')
        response = manager.read_from_device('gps', timeout=2)
        print(f"GPS response: {response}")

manager.close_all()
import serial
import struct
import time
from datetime import datetime

class HardwareUART:
    """Hardware UART interface for high-speed communication"""
    
    def __init__(self, device='/dev/ttyAMA0', baudrate=115200):
        self.device = device
        self.baudrate = baudrate
        self.serial = None
        
    def setup_hardware_uart(self):
        """Configure hardware UART with optimal settings"""
        try:
            self.serial = serial.Serial(
                port=self.device,
                baudrate=self.baudrate,
                parity=serial.PARITY_NONE,
                stopbits=serial.STOPBITS_ONE,
                bytesize=serial.EIGHTBITS,
                timeout=0.1,
                write_timeout=0.1,
                # Hardware flow control
                rtscts=True,
                dsrdtr=False
            )
            
            # Configure buffers for high throughput
            self.serial.set_buffer_size(rx_size=4096, tx_size=4096)
            return True
            
        except serial.SerialException as e:
            print(f"UART setup failed: {e}")
            return False
            
    def high_speed_data_logger(self, duration_seconds=10):
        """High-speed data logging example"""
        if not self.serial:
            return
            
        print(f"Starting high-speed logging for {duration_seconds} seconds...")
        
        data_log = []
        start_time = time.time()
        bytes_received = 0
        
        while time.time() - start_time < duration_seconds:
            if self.serial.in_waiting:
                chunk = self.serial.read(self.serial.in_waiting)
                timestamp = datetime.now()
                
                data_log.append({
                    'timestamp': timestamp,
                    'data': chunk,
                    'size': len(chunk)
                })
                
                bytes_received += len(chunk)
                
        # Calculate throughput
        elapsed = time.time() - start_time
        throughput = bytes_received / elapsed
        
        print(f"Logging complete:")
        print(f"  Duration: {elapsed:.2f} seconds")
        print(f"  Bytes received: {bytes_received:,}")
        print(f"  Throughput: {throughput:.0f} bytes/second")
        print(f"  Data chunks: {len(data_log)}")
        
        return data_log
        
    def binary_protocol_handler(self):
        """Handle binary protocol communication"""
        if not self.serial:
            return
            
        # Example: Custom binary protocol
        # Header: [0xAA, 0xBB, LENGTH, CMD]
        # Data: [DATA...]
        # Checksum: [CRC16]
        
        def send_binary_command(cmd, data=b''):
            header = struct.pack('>BBBB', 0xAA, 0xBB, len(data), cmd)
            
            # Calculate CRC16 (simplified)
            crc = sum(header + data) & 0xFFFF
            crc_bytes = struct.pack('>H', crc)
            
            packet = header + data + crc_bytes
            self.serial.write(packet)
            
        def read_binary_response():
            # Read header
            header = self.serial.read(4)
            if len(header) != 4 or header[:2] != b'\xAA\xBB':
                return None
                
            length = header[2]
            cmd = header[3]
            
            # Read data and checksum
            payload = self.serial.read(length + 2)
            if len(payload) != length + 2:
                return None
                
            data = payload[:-2]
            received_crc = struct.unpack('>H', payload[-2:])[0]
            
            # Verify checksum
            calculated_crc = sum(header + data) & 0xFFFF
            if received_crc != calculated_crc:
                return None
                
            return {
                'cmd': cmd,
                'data': data,
                'length': length
            }
            
        # Example usage
        send_binary_command(0x01, b'Hello Pi')  # Send command 0x01
        response = read_binary_response()
        if response:
            print(f"Received command {response['cmd']}: {response['data']}")
            
    def close(self):
        if self.serial:
            self.serial.close()

# Configure Raspberry Pi for hardware UART
# Add to /boot/config.txt:
# dtoverlay=uart2
# dtoverlay=uart3
# dtoverlay=uart4
# dtoverlay=uart5

# Usage
uart = HardwareUART('/dev/ttyAMA0', 230400)
if uart.setup_hardware_uart():
    # Run high-speed data logging
    log_data = uart.high_speed_data_logger(5)
    
    # Handle binary protocol
    uart.binary_protocol_handler()
    
    uart.close()

GPIO Integration

Combine serial communication with GPIO control for complete hardware integration.

import serial
import RPi.GPIO as GPIO
import time
import threading
from enum import Enum

class PinMode(Enum):
    INPUT = GPIO.IN
    OUTPUT = GPIO.OUT

class RaspberryPiController:
    def __init__(self, serial_device='/dev/serial0', baudrate=9600):
        # Setup GPIO
        GPIO.setmode(GPIO.BCM)
        GPIO.setwarnings(False)
        
        # Serial setup
        self.serial = serial.Serial(serial_device, baudrate, timeout=1)
        
        # Pin configurations
        self.pin_configs = {}
        self.pwm_objects = {}
        
        # Control flags
        self.running = False
        self.monitor_thread = None
        
    def setup_pin(self, pin, mode, initial_value=None, pull_up_down=None):
        """Configure GPIO pin"""
        self.pin_configs[pin] = mode
        
        if mode == PinMode.OUTPUT:
            GPIO.setup(pin, GPIO.OUT)
            if initial_value is not None:
                GPIO.output(pin, initial_value)
        else:
            pull = GPIO.PUD_OFF
            if pull_up_down == 'up':
                pull = GPIO.PUD_UP
            elif pull_up_down == 'down':
                pull = GPIO.PUD_DOWN
            GPIO.setup(pin, GPIO.IN, pull_up_down=pull)
            
    def setup_pwm(self, pin, frequency=1000):
        """Setup PWM on pin"""
        self.setup_pin(pin, PinMode.OUTPUT)
        pwm = GPIO.PWM(pin, frequency)
        self.pwm_objects[pin] = pwm
        pwm.start(0)  # Start with 0% duty cycle
        return pwm
        
    def digital_write(self, pin, value):
        """Write digital value to pin"""
        if pin in self.pin_configs and self.pin_configs[pin] == PinMode.OUTPUT:
            GPIO.output(pin, value)
            
    def digital_read(self, pin):
        """Read digital value from pin"""
        if pin in self.pin_configs and self.pin_configs[pin] == PinMode.INPUT:
            return GPIO.input(pin)
        return None
        
    def pwm_write(self, pin, duty_cycle):
        """Set PWM duty cycle (0-100)"""
        if pin in self.pwm_objects:
            self.pwm_objects[pin].ChangeDutyCycle(duty_cycle)
            
    def serial_gpio_bridge(self):
        """Bridge serial commands to GPIO operations"""
        command_map = {
            b'LED_ON': lambda: self.digital_write(18, GPIO.HIGH),
            b'LED_OFF': lambda: self.digital_write(18, GPIO.LOW),
            b'READ_SENSOR': lambda: self.digital_read(24),
            b'PWM_50': lambda: self.pwm_write(12, 50),
            b'PWM_100': lambda: self.pwm_write(12, 100)
        }
        
        while self.running:
            if self.serial.in_waiting:
                command = self.serial.readline().strip()
                
                if command in command_map:
                    result = command_map[command]()
                    
                    # Send response back
                    if result is not None:
                        response = f"RESULT:{result}\n"
                        self.serial.write(response.encode())
                    else:
                        self.serial.write(b"OK\n")
                else:
                    self.serial.write(b"UNKNOWN_COMMAND\n")
                    
            time.sleep(0.01)
            
    def sensor_data_logger(self, sensors, log_interval=1.0):
        """Log sensor data via serial"""
        def logging_worker():
            while self.running:
                readings = {}
                timestamp = time.time()
                
                for sensor_name, pin in sensors.items():
                    readings[sensor_name] = self.digital_read(pin)
                    
                # Format as CSV
                log_entry = f"{timestamp:.3f}"
                for sensor_name, value in readings.items():
                    log_entry += f",{sensor_name}:{value}"
                log_entry += "\n"
                
                self.serial.write(log_entry.encode())
                time.sleep(log_interval)
                
        self.monitor_thread = threading.Thread(target=logging_worker)
        self.monitor_thread.daemon = True
        self.monitor_thread.start()
        
    def start(self):
        """Start controller operations"""
        self.running = True
        
        # Setup common pins
        self.setup_pin(18, PinMode.OUTPUT)  # LED
        self.setup_pin(24, PinMode.INPUT, pull_up_down='up')  # Button/Sensor
        self.setup_pwm(12, 1000)  # PWM output
        
        # Start serial-GPIO bridge
        bridge_thread = threading.Thread(target=self.serial_gpio_bridge)
        bridge_thread.daemon = True
        bridge_thread.start()
        
        print("Raspberry Pi controller started")
        
    def cleanup(self):
        """Clean up resources"""
        self.running = False
        
        if self.monitor_thread:
            self.monitor_thread.join()
            
        # Stop PWM
        for pwm in self.pwm_objects.values():
            pwm.stop()
            
        # Cleanup GPIO
        GPIO.cleanup()
        
        # Close serial
        self.serial.close()
        
        print("Cleanup complete")

# Usage example
controller = RaspberryPiController('/dev/serial0', 115200)
controller.start()

try:
    # Define sensors to monitor
    sensors = {
        'temperature': 24,
        'motion': 25,
        'light': 23
    }
    
    # Start sensor logging
    controller.sensor_data_logger(sensors, log_interval=0.5)
    
    # Manual control examples
    controller.digital_write(18, GPIO.HIGH)  # Turn on LED
    time.sleep(1)
    controller.pwm_write(12, 75)  # Set PWM to 75%
    
    # Keep running
    input("Press Enter to stop...")
    
finally:
    controller.cleanup()

IoT and Cloud Integration

MQTT Bridge

import serial
import json
import time
import threading
import paho.mqtt.client as mqtt
from datetime import datetime

class SerialMQTTBridge:
    def __init__(self, serial_port, baudrate, mqtt_broker, mqtt_port=1883):
        # Serial setup
        self.serial = serial.Serial(serial_port, baudrate, timeout=1)
        
        # MQTT setup
        self.mqtt_client = mqtt.Client()
        self.mqtt_broker = mqtt_broker
        self.mqtt_port = mqtt_port
        
        # Topics
        self.data_topic = "rpi/serial/data"
        self.command_topic = "rpi/serial/command"
        self.status_topic = "rpi/serial/status"
        
        # State
        self.running = False
        self.device_id = f"rpi-{int(time.time())}"
        
    def setup_mqtt(self):
        """Configure MQTT client"""
        def on_connect(client, userdata, flags, rc):
            print(f"MQTT connected with code {rc}")
            client.subscribe(self.command_topic)
            
            # Publish online status
            status_msg = {
                'device_id': self.device_id,
                'status': 'online',
                'timestamp': datetime.now().isoformat()
            }
            client.publish(self.status_topic, json.dumps(status_msg))
            
        def on_message(client, userdata, msg):
            """Handle incoming MQTT commands"""
            try:
                command = json.loads(msg.payload.decode())
                self.handle_mqtt_command(command)
            except json.JSONDecodeError:
                # Treat as raw command
                self.serial.write(msg.payload + b'\n')
                
        self.mqtt_client.on_connect = on_connect
        self.mqtt_client.on_message = on_message
        
        # Set last will (offline status)
        offline_msg = {
            'device_id': self.device_id,
            'status': 'offline',
            'timestamp': datetime.now().isoformat()
        }
        self.mqtt_client.will_set(
            self.status_topic,
            json.dumps(offline_msg),
            retain=True
        )
        
    def handle_mqtt_command(self, command):
        """Process structured MQTT command"""
        cmd_type = command.get('type')
        
        if cmd_type == 'raw':
            # Send raw data to serial
            data = command.get('data', '')
            self.serial.write(data.encode() + b'\n')
            
        elif cmd_type == 'gpio':
            # GPIO control command
            pin = command.get('pin')
            action = command.get('action')
            value = command.get('value')
            
            gpio_command = f"GPIO {pin} {action} {value}\n"
            self.serial.write(gpio_command.encode())
            
        elif cmd_type == 'config':
            # Configuration command
            setting = command.get('setting')
            value = command.get('value')
            
            config_command = f"CONFIG {setting} {value}\n"
            self.serial.write(config_command.encode())
            
    def serial_to_mqtt_worker(self):
        """Forward serial data to MQTT"""
        buffer = b''
        
        while self.running:
            if self.serial.in_waiting:
                data = self.serial.read(self.serial.in_waiting)
                buffer += data
                
                # Process complete lines
                while b'\n' in buffer:
                    line, buffer = buffer.split(b'\n', 1)
                    line_str = line.decode('utf-8', errors='ignore').strip()
                    
                    if line_str:
                        # Create structured message
                        message = {
                            'device_id': self.device_id,
                            'timestamp': datetime.now().isoformat(),
                            'data': line_str,
                            'raw': True
                        }
                        
                        # Try to parse as JSON for structured data
                        try:
                            parsed_data = json.loads(line_str)
                            message['data'] = parsed_data
                            message['raw'] = False
                        except json.JSONDecodeError:
                            pass
                            
                        # Publish to MQTT
                        self.mqtt_client.publish(
                            self.data_topic,
                            json.dumps(message)
                        )
                        
            time.sleep(0.01)
            
    def start(self):
        """Start the bridge"""
        self.setup_mqtt()
        
        # Connect to MQTT broker
        self.mqtt_client.connect(self.mqtt_broker, self.mqtt_port, 60)
        self.mqtt_client.loop_start()
        
        # Start serial reader
        self.running = True
        self.serial_thread = threading.Thread(target=self.serial_to_mqtt_worker)
        self.serial_thread.daemon = True
        self.serial_thread.start()
        
        print(f"Serial-MQTT bridge started (Device ID: {self.device_id})")
        
    def stop(self):
        """Stop the bridge"""
        self.running = False
        
        # Publish offline status
        offline_msg = {
            'device_id': self.device_id,
            'status': 'offline',
            'timestamp': datetime.now().isoformat()
        }
        self.mqtt_client.publish(self.status_topic, json.dumps(offline_msg))
        
        # Cleanup
        if hasattr(self, 'serial_thread'):
            self.serial_thread.join()
            
        self.mqtt_client.loop_stop()
        self.mqtt_client.disconnect()
        self.serial.close()
        
        print("Bridge stopped")

# Usage
bridge = SerialMQTTBridge('/dev/serial0', 115200, 'localhost')
bridge.start()

try:
    # Keep bridge running
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    bridge.stop()

REST API Server

import serial
import json
import threading
import time
from flask import Flask, request, jsonify
from datetime import datetime
from collections import deque

app = Flask(__name__)

class SerialAPIServer:
    def __init__(self, serial_port, baudrate):
        self.serial = serial.Serial(serial_port, baudrate, timeout=1)
        self.data_buffer = deque(maxlen=1000)  # Store last 1000 messages
        self.running = False
        self.lock = threading.Lock()
        
    def start_serial_reader(self):
        """Background thread to read serial data"""
        def reader():
            buffer = b''
            while self.running:
                if self.serial.in_waiting:
                    data = self.serial.read(self.serial.in_waiting)
                    buffer += data
                    
                    while b'\n' in buffer:
                        line, buffer = buffer.split(b'\n', 1)
                        line_str = line.decode('utf-8', errors='ignore').strip()
                        
                        if line_str:
                            with self.lock:
                                self.data_buffer.append({
                                    'timestamp': datetime.now().isoformat(),
                                    'data': line_str
                                })
                                
                time.sleep(0.01)
                
        self.running = True
        self.reader_thread = threading.Thread(target=reader)
        self.reader_thread.daemon = True
        self.reader_thread.start()
        
    def send_command(self, command):
        """Send command to serial device"""
        try:
            self.serial.write(command.encode() + b'\n')
            return True
        except:
            return False
            
    def get_recent_data(self, count=10):
        """Get recent data messages"""
        with self.lock:
            return list(self.data_buffer)[-count:]
            
    def stop(self):
        self.running = False
        if hasattr(self, 'reader_thread'):
            self.reader_thread.join()
        self.serial.close()

# Initialize serial API
serial_api = SerialAPIServer('/dev/serial0', 115200)
serial_api.start_serial_reader()

@app.route('/api/serial/send', methods=['POST'])
def send_serial_command():
    """Send command to serial device"""
    data = request.get_json()
    command = data.get('command', '')
    
    if serial_api.send_command(command):
        return jsonify({'status': 'success', 'command': command})
    else:
        return jsonify({'status': 'error', 'message': 'Failed to send command'}), 500

@app.route('/api/serial/data', methods=['GET'])
def get_serial_data():
    """Get recent serial data"""
    count = request.args.get('count', 10, type=int)
    data = serial_api.get_recent_data(count)
    return jsonify({'status': 'success', 'data': data})

@app.route('/api/serial/stream')
def stream_serial_data():
    """Server-sent events stream of serial data"""
    def generate():
        last_count = 0
        while True:
            current_data = serial_api.get_recent_data(1000)
            if len(current_data) > last_count:
                new_data = current_data[last_count:]
                for item in new_data:
                    yield f"data: {json.dumps(item)}\n\n"
                last_count = len(current_data)
            time.sleep(0.1)
            
    return app.response_class(
        generate(),
        mimetype='text/event-stream'
    )

@app.route('/api/system/info')
def system_info():
    """Get system information"""
    import psutil
    import os
    
    return jsonify({
        'raspberry_pi': {
            'model': open('/proc/device-tree/model', 'rb').read().decode().strip('\x00'),
            'serial_number': open('/proc/cpuinfo').read().split('Serial\t\t:')[-1].strip()[:16],
            'cpu_temp': float(open('/sys/class/thermal/thermal_zone0/temp').read()) / 1000,
            'uptime': open('/proc/uptime').read().split()[0]
        },
        'system': {
            'cpu_percent': psutil.cpu_percent(),
            'memory_percent': psutil.virtual_memory().percent,
            'disk_percent': psutil.disk_usage('/').percent
        }
    })

if __name__ == '__main__':
    try:
        app.run(host='0.0.0.0', port=5000, debug=False)
    finally:
        serial_api.stop()

Data Logging and Analysis

import serial
import sqlite3
import json
import threading
import time
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime, timedelta

class SerialDataLogger:
    def __init__(self, serial_port, baudrate, db_path='serial_data.db'):
        self.serial = serial.Serial(serial_port, baudrate, timeout=1)
        self.db_path = db_path
        self.running = False
        
        # Initialize database
        self.init_database()
        
    def init_database(self):
        """Initialize SQLite database"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS serial_data (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                timestamp TEXT NOT NULL,
                raw_data TEXT NOT NULL,
                parsed_data TEXT,
                data_type TEXT,
                created_at DATETIME DEFAULT CURRENT_TIMESTAMP
            )
        ''')
        
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS sensor_readings (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                timestamp TEXT NOT NULL,
                sensor_type TEXT NOT NULL,
                value REAL NOT NULL,
                unit TEXT,
                created_at DATETIME DEFAULT CURRENT_TIMESTAMP
            )
        ''')
        
        conn.commit()
        conn.close()
        
    def parse_sensor_data(self, raw_data):
        """Parse common sensor data formats"""
        try:
            # Try JSON format
            data = json.loads(raw_data)
            return data, 'json'
        except json.JSONDecodeError:
            pass
            
        # Try CSV format: sensor_type,value,unit
        parts = raw_data.split(',')
        if len(parts) == 3:
            return {
                'sensor_type': parts[0].strip(),
                'value': float(parts[1].strip()),
                'unit': parts[2].strip()
            }, 'csv'
            
        # Try key-value format: temp=25.5C
        if '=' in raw_data:
            pairs = {}
            for pair in raw_data.split(','):
                if '=' in pair:
                    key, value = pair.split('=', 1)
                    pairs[key.strip()] = value.strip()
            return pairs, 'keyvalue'
            
        return None, 'unknown'
        
    def log_data(self, raw_data):
        """Log data to database"""
        timestamp = datetime.now().isoformat()
        parsed_data, data_type = self.parse_sensor_data(raw_data)
        
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # Log raw data
        cursor.execute('''
            INSERT INTO serial_data (timestamp, raw_data, parsed_data, data_type)
            VALUES (?, ?, ?, ?)
        ''', (timestamp, raw_data, json.dumps(parsed_data) if parsed_data else None, data_type))
        
        # Log parsed sensor readings
        if parsed_data and isinstance(parsed_data, dict):
            if 'sensor_type' in parsed_data and 'value' in parsed_data:
                cursor.execute('''
                    INSERT INTO sensor_readings (timestamp, sensor_type, value, unit)
                    VALUES (?, ?, ?, ?)
                ''', (
                    timestamp,
                    parsed_data['sensor_type'],
                    parsed_data['value'],
                    parsed_data.get('unit', '')
                ))
            else:
                # Handle multiple sensors in one message
                for key, value in parsed_data.items():
                    try:
                        numeric_value = float(str(value).rstrip('C°F%'))
                        unit = str(value)[-1] if str(value)[-1] in 'C°F%' else ''
                        
                        cursor.execute('''
                            INSERT INTO sensor_readings (timestamp, sensor_type, value, unit)
                            VALUES (?, ?, ?, ?)
                        ''', (timestamp, key, numeric_value, unit))
                    except ValueError:
                        pass  # Skip non-numeric values
                        
        conn.commit()
        conn.close()
        
    def data_logger_worker(self):
        """Background worker for logging data"""
        buffer = b''
        
        while self.running:
            if self.serial.in_waiting:
                data = self.serial.read(self.serial.in_waiting)
                buffer += data
                
                while b'\n' in buffer:
                    line, buffer = buffer.split(b'\n', 1)
                    line_str = line.decode('utf-8', errors='ignore').strip()
                    
                    if line_str:
                        self.log_data(line_str)
                        print(f"Logged: {line_str}")
                        
            time.sleep(0.01)
            
    def start_logging(self):
        """Start data logging"""
        self.running = True
        self.logger_thread = threading.Thread(target=self.data_logger_worker)
        self.logger_thread.daemon = True
        self.logger_thread.start()
        print("Data logging started")
        
    def stop_logging(self):
        """Stop data logging"""
        self.running = False
        if hasattr(self, 'logger_thread'):
            self.logger_thread.join()
        self.serial.close()
        print("Data logging stopped")
        
    def generate_report(self, hours=24):
        """Generate analysis report"""
        end_time = datetime.now()
        start_time = end_time - timedelta(hours=hours)
        
        conn = sqlite3.connect(self.db_path)
        
        # Get sensor data
        df = pd.read_sql_query('''
            SELECT timestamp, sensor_type, value, unit
            FROM sensor_readings
            WHERE timestamp >= ? AND timestamp <= ?
            ORDER BY timestamp
        ''', conn, params=(start_time.isoformat(), end_time.isoformat()))
        
        if df.empty:
            print("No data found for the specified time range")
            return
            
        # Convert timestamp to datetime
        df['timestamp'] = pd.to_datetime(df['timestamp'])
        
        # Generate plots
        fig, axes = plt.subplots(2, 2, figsize=(15, 10))
        fig.suptitle(f'Sensor Data Analysis - Last {hours} Hours')
        
        # Plot 1: Temperature over time
        temp_data = df[df['sensor_type'] == 'temperature']
        if not temp_data.empty:
            axes[0, 0].plot(temp_data['timestamp'], temp_data['value'])
            axes[0, 0].set_title('Temperature Over Time')
            axes[0, 0].set_ylabel('Temperature (°C)')
            axes[0, 0].tick_params(axis='x', rotation=45)
            
        # Plot 2: Data distribution
        sensor_counts = df['sensor_type'].value_counts()
        axes[0, 1].pie(sensor_counts.values, labels=sensor_counts.index, autopct='%1.1f%%')
        axes[0, 1].set_title('Sensor Data Distribution')
        
        # Plot 3: Value ranges by sensor
        df.boxplot(column='value', by='sensor_type', ax=axes[1, 0])
        axes[1, 0].set_title('Value Ranges by Sensor Type')
        axes[1, 0].set_xlabel('Sensor Type')
        
        # Plot 4: Data frequency over time
        hourly_counts = df.set_index('timestamp').resample('H').size()
        axes[1, 1].bar(range(len(hourly_counts)), hourly_counts.values)
        axes[1, 1].set_title('Data Points Per Hour')
        axes[1, 1].set_ylabel('Count')
        
        plt.tight_layout()
        plt.savefig('sensor_report.png', dpi=300, bbox_inches='tight')
        plt.show()
        
        # Print statistics
        print("\n=== Data Summary ===")
        print(f"Total data points: {len(df)}")
        print(f"Unique sensors: {df['sensor_type'].nunique()}")
        print(f"Time range: {df['timestamp'].min()} to {df['timestamp'].max()}")
        
        print("\n=== Sensor Statistics ===")
        for sensor_type in df['sensor_type'].unique():
            sensor_data = df[df['sensor_type'] == sensor_type]
            print(f"\n{sensor_type.upper()}:")
            print(f"  Count: {len(sensor_data)}")
            print(f"  Min: {sensor_data['value'].min():.2f}")
            print(f"  Max: {sensor_data['value'].max():.2f}")
            print(f"  Mean: {sensor_data['value'].mean():.2f}")
            print(f"  Std: {sensor_data['value'].std():.2f}")
            
        conn.close()

# Usage example
logger = SerialDataLogger('/dev/serial0', 115200)
logger.start_logging()

try:
    # Let it run for a while
    time.sleep(60)  # Log for 1 minute
    
    # Generate report
    logger.generate_report(hours=1)
    
except KeyboardInterrupt:
    pass
finally:
    logger.stop_logging()

Raspberry Pi provides excellent serial communication capabilities for IoT projects. Combine GPIO control with serial interfaces to create powerful embedded applications.

How is this guide?