Adsense HTML/JavaScript

Showing posts with label code.micropython. Show all posts
Showing posts with label code.micropython. Show all posts

Wednesday, July 6, 2022

ESP32-C3/MicroPython multithreading exercise, get user input un-blocked using _thread.

This exercise get user input by call input(). But input() is a blocking function, means it blocks further execution of a program until user enter something. In this exercise code, read user input by calling input() and control RGB in two separated thread using _thread.

Please notice that _thread currently is highly experimental and its API is not yet fully settled.


mpyC3_thread.py
"""
ESP32-C3/MicroPython exercise:
read user input non-blocked using _thread

MicroPython libraries _thread (multithreading support)
https://docs.micropython.org/en/latest/library/_thread.html

This module is highly experimental and its API is not yet fully
settled and not yet described in documentation.

So, basically - the excise is by guessing, and run as is.

Tested on Espressif ESP32-C3-DevKitM-1/micropython v1.19.1
"""
import os
import sys
import time
import _thread
import neopixel
from machine import Pin

# On Espreffif ESP32-C3-DevKitM-1:
# The onboard RGB LED (WS2812) is connected to GPIO8
np = neopixel.NeoPixel(Pin(8), 1)

rqs_to_show =False
to_show = ""

print()

print("====================================")
print(sys.implementation[0], os.uname()[3],
      "\nrun on", os.uname()[4])
print("====================================")

# thread to read user input,
# input() will block the program,
# so have to run in another thread.
def input_thread():
    global rqs_to_show
    global to_show
    while True:
        user_input = input()
        to_show = user_input
        rqs_to_show = True
        print("in input_thread() => ", user_input)
        
# thread to change RGB repeatly.
def rgb_thread():
    while True:
        np[0] = (10, 0, 0)
        np.write()
        time.sleep(0.5)
        np[0] = (0, 10, 0)
        np.write()
        time.sleep(0.5)
        np[0] = (0, 0, 10)
        np.write()
        time.sleep(0.5)

_thread.start_new_thread(input_thread, ())
_thread.start_new_thread(rgb_thread, ())

while True:
    if rqs_to_show:
        rqs_to_show = False
        print("in main thread: rqs_to_show -> ", to_show)

Next:
~ applied on real exercise: ESP32-C3/MicroPython BLE UART Communication

Sunday, July 3, 2022

MicroPython/ESP32-C3 Exercise: send/receive command via BLE UART

My former post show steps to MicroPython bluetooth (BLE) exampls, run on ESP32-C3. It's modified to send and receive command to control onboard LED remotely.

ble_simple_peripheral_LED.py (modified from ble_simple_peripheral.py) run on AI-Thinker NodeMCU ESP-C3-32S-Kit, act to be BLE Peripheral.

ble_simple_central_button.py (modified from ble_simple_central.py) run on Espressif ESP32-C3-DevKitM-1, act to be BLE Central.

Both flashed with MicroPython v1.19.1 frameware. ble_advertising.py have to be saved on both central/peripheral MicroPython device.

Once connected, user pressed central's onboard button to send command to peripheral via BLE UART, to toggle peripheral onboard LED.

In peripheral side, turn ON/OFF onboard LED according to received command, and send back the command to central, to control central's onboard LED. 

Once central receive command, turn ON/OFF onboard LED accordingly.

related:
MicroPython/NodeMCU ESP-C3-32S-Kit to control onboard LEDs
MicroPython/ESP32-C3-DevKitM-1 exercise: onboard BOOT button, and RGB LED (Neopixel)


ble_simple_peripheral_LED.py

"""
MicroPython(v1.19.1) exercise
run on AI-Thinker NodeMCU ESP-C3-32S-Kit
act as BLE UART periperhal.

Receive command from central, turn on/off onboard LED,
and send back the command to central.

Modified from MicroPython ble_simple_peripheral.py example
https://github.com/micropython/micropython/
blob/master/examples/bluetooth/ble_simple_central.py

"""

# This example demonstrates a UART periperhal.

import bluetooth
import random
import struct
import time
from ble_advertising import advertising_payload
from machine import Pin

from micropython import const

CMD_LEDON = b'LEDON\r\n'
CMD_LEDOFF = b'LEDOFF\r\n'

# NodeMCU ESP-C3-32S-Kit onboard LEDs assignment
pinR = Pin(3, Pin.OUT)
pinG = Pin(4, Pin.OUT)
pinB = Pin(5, Pin.OUT)

_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_GATTS_WRITE = const(3)

_FLAG_READ = const(0x0002)
_FLAG_WRITE_NO_RESPONSE = const(0x0004)
_FLAG_WRITE = const(0x0008)
_FLAG_NOTIFY = const(0x0010)

_UART_UUID = bluetooth.UUID(
    "6E400001-B5A3-F393-E0A9-E50E24DCCA9E")
_UART_TX = (
    bluetooth.UUID(
        "6E400003-B5A3-F393-E0A9-E50E24DCCA9E"),
    _FLAG_READ | _FLAG_NOTIFY,
)
_UART_RX = (
    bluetooth.UUID(
        "6E400002-B5A3-F393-E0A9-E50E24DCCA9E"),
    _FLAG_WRITE | _FLAG_WRITE_NO_RESPONSE,
)
_UART_SERVICE = (
    _UART_UUID,
    (_UART_TX, _UART_RX),
)


class BLESimplePeripheral:
    def __init__(self, ble, name="mpy-uart"):
        self._ble = ble
        self._ble.active(True)
        self._ble.irq(self._irq)
        ((self._handle_tx,
          self._handle_rx),) \
          = self._ble.gatts_register_services(
              (_UART_SERVICE,))
        self._connections = set()
        self._write_callback = None
        self._payload = \
                      advertising_payload(
                          name=name, services=[_UART_UUID])
        self._advertise()

    def _irq(self, event, data):
        # Track connections so we can send notifications.
        if event == _IRQ_CENTRAL_CONNECT:
            conn_handle, _, _ = data
            print("New connection", conn_handle)
            self._connections.add(conn_handle)
        elif event == _IRQ_CENTRAL_DISCONNECT:
            conn_handle, _, _ = data
            print("Disconnected", conn_handle)
            self._connections.remove(conn_handle)
            # Start advertising again to allow a new connection.
            self._advertise()
        elif event == _IRQ_GATTS_WRITE:
            conn_handle, value_handle = data
            value = self._ble.gatts_read(value_handle)
            if (value_handle == self._handle_rx and
                self._write_callback):
                self._write_callback(value)

    def send(self, data):
        for conn_handle in self._connections:
            self._ble.gatts_notify(conn_handle,
                                   self._handle_tx,
                                   data)

    def is_connected(self):
        return len(self._connections) > 0

    def _advertise(self, interval_us=500000):
        print("Starting advertising")
        self._ble.gap_advertise(interval_us,
                                adv_data=self._payload)

    def on_write(self, callback):
        self._write_callback = callback

def Turn_LED(val):
    pinR.value(val)
    pinG.value(val)
    pinB.value(val)

def demo():
    ble = bluetooth.BLE()
    p = BLESimplePeripheral(ble)

    def on_rx(v):
        # command received from central,
        # turn ON/OFF LED accoringly,
        # and send back the command to centrol.
        print("RX", v)
        
        if v == CMD_LEDON:
            Turn_LED(1)
            print("command received: ", CMD_LEDON)
            p.send("from peripheral:")
            p.send(CMD_LEDON)
        elif v == CMD_LEDOFF:
            Turn_LED(0)
            print("command received: ", CMD_LEDOFF)
            p.send("from peripheral:")
            p.send(CMD_LEDOFF)

    p.on_write(on_rx)

    i = 0
    while True:
        """
        if p.is_connected():
            # Short burst of queued notifications.
            for _ in range(3):
                data = str(i) + "_"
                print("TX", data)
                p.send(data)
                i += 1
        """
        time.sleep_ms(100)


if __name__ == "__main__":
    demo()

ble_simple_central_button.py
"""
MicroPython(v1.19.1) exercise
run on Espressif ESP32-C3-DevKitM-1 
act as BLE UART central.

Detect onboard BOOT button,
send command to peripheral to toggle peripheral onboard LED.
and receive command from peripheral, turn on/off onboard.

* No debouncing for BOOT button detection here.

Modified from MicroPython ble_simple_central.py example
https://github.com/micropython/micropython/
blob/master/examples/bluetooth/ble_simple_peripheral.py

"""

# This example finds and connects to a peripheral running the
# UART service (e.g. ble_simple_peripheral.py).

import bluetooth
import random
import struct
import time
import micropython
import machine
import neopixel

from ble_advertising import decode_services, decode_name

from micropython import const

CMD_LEDON = b'LEDON\r\n'
CMD_LEDOFF = b'LEDOFF\r\n'

button_BOOT = machine.Pin(9,
                          machine.Pin.IN,
                          machine.Pin.PULL_UP)
np = neopixel.NeoPixel(machine.Pin(8), 1)

# To turn OFF peripheral LED in first power-up
current_led_val = True
root_button_pressed = True

# Turn OFF onboard RGB
np[0] = (0, 0, 0)
np.write()

def boot_pressed_handler(pin):
    global root_button_pressed
    root_button_pressed = True
    
button_BOOT.irq(trigger=machine.Pin.IRQ_FALLING,
                handler=boot_pressed_handler)

_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_GATTS_WRITE = const(3)
_IRQ_GATTS_READ_REQUEST = const(4)
_IRQ_SCAN_RESULT = const(5)
_IRQ_SCAN_DONE = const(6)
_IRQ_PERIPHERAL_CONNECT = const(7)
_IRQ_PERIPHERAL_DISCONNECT = const(8)
_IRQ_GATTC_SERVICE_RESULT = const(9)
_IRQ_GATTC_SERVICE_DONE = const(10)
_IRQ_GATTC_CHARACTERISTIC_RESULT = const(11)
_IRQ_GATTC_CHARACTERISTIC_DONE = const(12)
_IRQ_GATTC_DESCRIPTOR_RESULT = const(13)
_IRQ_GATTC_DESCRIPTOR_DONE = const(14)
_IRQ_GATTC_READ_RESULT = const(15)
_IRQ_GATTC_READ_DONE = const(16)
_IRQ_GATTC_WRITE_DONE = const(17)
_IRQ_GATTC_NOTIFY = const(18)
_IRQ_GATTC_INDICATE = const(19)

_ADV_IND = const(0x00)
_ADV_DIRECT_IND = const(0x01)
_ADV_SCAN_IND = const(0x02)
_ADV_NONCONN_IND = const(0x03)

_UART_SERVICE_UUID = bluetooth.UUID(
    "6E400001-B5A3-F393-E0A9-E50E24DCCA9E")
_UART_RX_CHAR_UUID = bluetooth.UUID(
    "6E400002-B5A3-F393-E0A9-E50E24DCCA9E")
_UART_TX_CHAR_UUID = bluetooth.UUID(
    "6E400003-B5A3-F393-E0A9-E50E24DCCA9E")


class BLESimpleCentral:
    def __init__(self, ble):
        self._ble = ble
        self._ble.active(True)
        self._ble.irq(self._irq)

        self._reset()

    def _reset(self):
        # Cached name and address from a successful scan.
        self._name = None
        self._addr_type = None
        self._addr = None

        # Callbacks for completion of various operations.
        # These reset back to None after being invoked.
        self._scan_callback = None
        self._conn_callback = None
        self._read_callback = None

        # Persistent callback for when new data is
        # notified from the device.
        self._notify_callback = None

        # Connected device.
        self._conn_handle = None
        self._start_handle = None
        self._end_handle = None
        self._tx_handle = None
        self._rx_handle = None

    def _irq(self, event, data):
        if event == _IRQ_SCAN_RESULT:
            addr_type, addr, adv_type, rssi, adv_data = data
            if (adv_type in (_ADV_IND, _ADV_DIRECT_IND) and
                _UART_SERVICE_UUID in decode_services(adv_data)):
                # Found a potential device, remember it
                # and stop scanning.
                self._addr_type = addr_type
                self._addr = bytes(
                    addr
                )  # Note: addr buffer is owned by caller so
                   # need to copy it.
                self._name = decode_name(adv_data) or "?"
                self._ble.gap_scan(None)

        elif event == _IRQ_SCAN_DONE:
            if self._scan_callback:
                if self._addr:
                    # Found a device during the scan
                    # (and the scan was explicitly stopped).
                    self._scan_callback(self._addr_type,
                                        self._addr,
                                        self._name)
                    self._scan_callback = None
                else:
                    # Scan timed out.
                    self._scan_callback(None, None, None)

        elif event == _IRQ_PERIPHERAL_CONNECT:
            # Connect successful.
            conn_handle, addr_type, addr = data
            if addr_type == self._addr_type and addr == self._addr:
                self._conn_handle = conn_handle
                self._ble.gattc_discover_services(self._conn_handle)

        elif event == _IRQ_PERIPHERAL_DISCONNECT:
            # Disconnect (either initiated by us or the remote end).
            conn_handle, _, _ = data
            if conn_handle == self._conn_handle:
                # If it was initiated by us, it'll already be reset.
                self._reset()

        elif event == _IRQ_GATTC_SERVICE_RESULT:
            # Connected device returned a service.
            conn_handle, start_handle, end_handle, uuid = data
            print("service", data)
            if conn_handle == self._conn_handle and uuid == _UART_SERVICE_UUID:
                self._start_handle, self._end_handle = start_handle, end_handle

        elif event == _IRQ_GATTC_SERVICE_DONE:
            # Service query complete.
            if self._start_handle and self._end_handle:
                self._ble.gattc_discover_characteristics(
                    self._conn_handle,
                    self._start_handle,
                    self._end_handle
                )
            else:
                print("Failed to find uart service.")

        elif event == _IRQ_GATTC_CHARACTERISTIC_RESULT:
            # Connected device returned a characteristic.
            conn_handle, def_handle, value_handle, properties, uuid = data
            if (conn_handle == self._conn_handle and
                uuid == _UART_RX_CHAR_UUID):
                self._rx_handle = value_handle
            if (conn_handle == self._conn_handle and
                uuid == _UART_TX_CHAR_UUID):
                self._tx_handle = value_handle

        elif event == _IRQ_GATTC_CHARACTERISTIC_DONE:
            # Characteristic query complete.
            if self._tx_handle is not None and self._rx_handle is not None:
                # We've finished connecting and discovering device,
                # fire the connect callback.
                if self._conn_callback:
                    self._conn_callback()
            else:
                print("Failed to find uart rx characteristic.")

        elif event == _IRQ_GATTC_WRITE_DONE:
            conn_handle, value_handle, status = data
            print("TX complete")

        elif event == _IRQ_GATTC_NOTIFY:
            conn_handle, value_handle, notify_data = data
            if (conn_handle == self._conn_handle
                and value_handle == self._tx_handle):
                if self._notify_callback:
                    self._notify_callback(notify_data)

    # Returns true if we've successfully connected and
    # discovered characteristics.
    def is_connected(self):
        return (
            self._conn_handle is not None
            and self._tx_handle is not None
            and self._rx_handle is not None
        )

    # Find a device advertising the environmental sensor service.
    def scan(self, callback=None):
        self._addr_type = None
        self._addr = None
        self._scan_callback = callback
        self._ble.gap_scan(2000, 30000, 30000)

    # Connect to the specified device
    # (otherwise use cached address from a scan).
    def connect(self, addr_type=None, addr=None, callback=None):
        self._addr_type = addr_type or self._addr_type
        self._addr = addr or self._addr
        self._conn_callback = callback
        if self._addr_type is None or self._addr is None:
            return False
        self._ble.gap_connect(self._addr_type, self._addr)
        return True

    # Disconnect from current device.
    def disconnect(self):
        if not self._conn_handle:
            return
        self._ble.gap_disconnect(self._conn_handle)
        self._reset()

    # Send data over the UART
    def write(self, v, response=False):
        if not self.is_connected():
            return
        self._ble.gattc_write(self._conn_handle,
                              self._rx_handle, v,
                              1 if response else 0)

    # Set handler for when data is received over the UART.
    def on_notify(self, callback):
        self._notify_callback = callback
    
def demo():
    global root_button_pressed
    global current_led_val
    
    ble = bluetooth.BLE()
    central = BLESimpleCentral(ble)

    not_found = False
    
    def send_CMD(cmd):
        try:
            central.write(cmd, with_response)
        except:
            print("TX failed")

    def on_scan(addr_type, addr, name):
        if addr_type is not None:
            print("Found peripheral:", addr_type, addr, name)
            central.connect()
        else:
            nonlocal not_found
            not_found = True
            print("No peripheral found.")

    central.scan(callback=on_scan)

    # Wait for connection...
    while not central.is_connected():
        time.sleep_ms(100)
        if not_found:
            return

    print("Connected")
    
    def on_rx(v):
        # command received from peripheral,
        # update onboard RGB accordingly.
        print("RX", v)
        
        #convert memoryview to str
        cmd = str(v,'utf8')
        print(cmd)
        
        if v == CMD_LEDON:
            np[0] = (3, 3, 0)
            np.write()
        elif v == CMD_LEDOFF:
            np[0] = (0, 0, 0)
            np.write()
            

    central.on_notify(on_rx)

    with_response = False

    while central.is_connected():
        
        if root_button_pressed:
            # BOOT button pressed,
            # send command to peripheral to toggle LED
            root_button_pressed = False
            current_led_val = not current_led_val
            print("- root_button_pressed -", current_led_val)
            
            if current_led_val:
                send_CMD(CMD_LEDON)
                
            else:
                send_CMD(CMD_LEDOFF)

        time.sleep_ms(400 if with_response else 30)

    print("Disconnected")


if __name__ == "__main__":
    demo()

Saturday, July 2, 2022

MicroPython/ESP32-C3-DevKitM-1 exercise: onboard BOOT button, and RGB LED (Neopixel).


Run on Espressif ESP32-C3-DevKitM-1 with MicroPython v1.19.1 on 2022-06-18 installed, the following exercise detect onboard BOOT button, and control onboard RGB LED (Neopixel).


mpyESP32-C3-DevKitM-1_neopixel.py
Simple testing on onboard RGB LED (Neopixel).
import machine
import time
import neopixel

"""
MicroPython v1.19.1/ESP32-C3-DevKitM-1 exercise:
Simple testing on onboard RGB LED (Neopixel).
"""

# On Espreffif ESP32-C3-DevKitM-1:
# The onboard RGB LED (WS2812) is connected to GPIO8

np = neopixel.NeoPixel(machine.Pin(8), 1)

while True:
    np[0] = (0, 0, 0)
    np.write()
    time.sleep(1)
    np[0] = (255, 0, 0)
    np.write()
    time.sleep(1)
    np[0] = (0, 255, 0)
    np.write()
    time.sleep(1)
    np[0] = (0, 0, 255)
    np.write()
    time.sleep(1)
    np[0] = (255, 255, 255)
    np.write()
    time.sleep(1)
    

mpyESP32-C3-DevKitM-1_neopixel_2.py
Control onboard RGB LED (Neopixel), with level control.
import machine
import time
import neopixel

"""
MicroPython v1.19.1/ESP32-C3-DevKitM-1 exercise:
Control onboard RGB LED (Neopixel), with level control.
"""

# On Espreffif ESP32-C3-DevKitM-1:
# The onboard RGB LED (WS2812) is connected to GPIO8

np = neopixel.NeoPixel(machine.Pin(8), 1)

def setNeoPixel(level, enable):
    np[0] = (level * enable[0],
             level * enable[1],
             level * enable[2])
    np.write()
    
def testNeoPixel(enable):
    for l in range(0, 256):
        setNeoPixel(l, enable)
        time.sleep(0.02)

while True:
    np[0] = (0, 0, 0)
    np.write()
    time.sleep(1)

    testNeoPixel([True, False, False])
    testNeoPixel([False, True, False])
    testNeoPixel([False, False, True])
    
    testNeoPixel([True, True, False])
    testNeoPixel([False, True, True])
    testNeoPixel([True, False, True])
    
    testNeoPixel([True, True, True])


mpyESP32-C3-DevKitM-1_button.py
Simple test onboard BOOT button, and verify the logic.
import machine
import time

"""
MicroPython v1.19.1/ESP32-C3-DevKitM-1 exercise:
Simple test onboard BOOT button, and verify the logic.
"""

# On Espreffif ESP32-C3-DevKitM-1:
# The onboard BOOT Button is connected to GPIO9

button_BOOT = machine.Pin(9,
                          machine.Pin.IN,
                          machine.Pin.PULL_UP)

while True:
    time.sleep(0.5)
    print(button_BOOT.value())

mpyESP32-C3-DevKitM-1_button_neopixel.py
Read BOOT button and turn on/off onboard RGB accordingly.
import machine
import time
import neopixel

"""
MicroPython v1.19.1/ESP32-C3-DevKitM-1 exercise:
Read BOOT button and turn on/off onboard RGB accordingly.
"""

# On Espreffif ESP32-C3-DevKitM-1:
# The onboard RGB LED (WS2812) is connected to GPIO8
# The onboard BOOT Button is connected to GPIO9

button_BOOT = machine.Pin(9,
                          machine.Pin.IN,
                          machine.Pin.PULL_UP)
np = neopixel.NeoPixel(machine.Pin(8), 1)

while True:
    time.sleep(0.2)
    if (button_BOOT.value()):  # button released
        np[0] = (0, 0, 0)
    else:                      # button pressed
        np[0] = (0, 3, 0)
    np.write()

mpyESP32-C3-DevKitM-1_button_irq.py
Implement IRQ handler to detect BOOT button pressing, and toggle onboard RGB.
import machine
import time
import neopixel

"""
MicroPython v1.19.1/ESP32-C3-DevKitM-1 exercise:
Implement IRQ handler to detect BOOT button pressing,
and toggle onboard RGB.

* No debouncing for button detection here.
"""

# On Espreffif ESP32-C3-DevKitM-1:
# The onboard RGB LED (WS2812) is connected to GPIO8
# The onboard BOOT Button is connected to GPIO9

button_BOOT = machine.Pin(9,
                          machine.Pin.IN,
                          machine.Pin.PULL_UP)
np = neopixel.NeoPixel(machine.Pin(8), 1)

np[0] = (0, 0, 0)
last_np_state = False
def toggle_LED():
    global last_np_state
    last_np_state =  not last_np_state

    if last_np_state:
        np[0] = (0, 0, 5)
    else:
        np[0] = (0, 0, 0)
    np.write()
    
def boot_pressed_handler(pin):
    print("BOOT button pressed:\t", pin)
    toggle_LED()
    
button_BOOT.irq(trigger=machine.Pin.IRQ_FALLING,
                handler=boot_pressed_handler)

while True:
    pass


Wednesday, May 4, 2022

ESP32-C3/MicroPython exercise: update time using ntptime

 MicroPython (v1.18 ) exercise run on ESP32-C3-DevKitM-1, to update time using utptime.

"""
MicroPython/ESP32C3 exercise run on ESP32-C3-DevKitM-1,
about time.
"""
import uos
import usys
import time

import network
import ntptime

TIME_OFFSET = +8 * 60 *60   #offset for your timezone

print("\n====================================")
print(usys.implementation[0], uos.uname()[3],
      "\nrun on", uos.uname()[4])
print("====================================\n")

def connect_and_update_ntptime():
    wlan = network.WLAN(network.STA_IF)
    wlan.active(True)
    wlan.disconnect()
    time.sleep(1)
    if not wlan.isconnected():
        print('connecting to network...')
        wlan.connect('ssid', 'password')
        while not wlan.isconnected():
            pass
    print('network config:', wlan.ifconfig())
    
    ntptime.settime()
    wlan.disconnect()

connect_and_update_ntptime()
now_localtime =time.localtime(time.time() + TIME_OFFSET)
print(now_localtime)



Thursday, November 25, 2021

RPi Pico/MicroPython + ESP-C3-12F (as a WiFi co-processor using AT Command)


It's a exercise on Raspberry Pi Pico/MicroPython work with ESP-C3-12F Module. The ESP-C3-12F preloaded with AT Command Firmware (AT version:2.2.0.0 (s-90458f0 - ESP32C3 - Jun 18 2021 10:24:22)), act as a WiFi co-processor. The Raspberry Pi Pico control ESP-C3-12F using AT Command, to connect and send data to TCP and UDP Server running on Raspberry Pi 4B/Python.


Connection:

It's strongly recommended using separate 3V3 Power Supply for ESP-C3-12F.


	Separated
	Power Supply	ESP-C3-12F            Raspberry Pi Pico
	
			+-------------+       +----------------
	3V3 ------------|VCC       RX0|-------| GP0 (UART TX)
			|          TX0|-------| GP1 (UART RX)
			|             |       |
			|          EN |-------| GP2  
			|     GND     |       |
			+------+------+       |
	                       |              |
	GND -------------------+--------------| GND
                                              +----------------
												
												
Exercise Code:

mpyPico_ESP_AT_TCP.py, a simple program to try AT command on ESP.
import usys, uos
import machine
import utime

class color:
    BLACK =   '\033[1;30;48m'
    RED =     '\033[1;31;48m'
    GREEN =   '\033[1;32;48m'
    YELLOW =  '\033[1;33;48m'
    BLUE =    '\033[1;34;48m'
    MAGENTA = '\033[1;35;48m'
    CYAN =    '\033[1;36;48m'
    END =    '\033[1;37;0m'

print("====================================")
print(usys.implementation[0], uos.uname()[3],
      "\nrun on", uos.uname()[4])
print("------------------------------------")


esp_reset = machine.Pin(2, machine.Pin.OUT)

#uart_esp =machine.UART(0, baudrate=115200)
uart_esp =machine.UART(0, timeout=1000)
#print("UART(0) connected to ESP: \n", uart_esp, "\n")

#==================================
"""
esp_sendCMD_waitResp: send comand to ESP, and wait response
if targetResp catched, return True
otherwise return False
"""
def esp_sendCMD_waitResp(cmd, uart=uart_esp, timeout=2000, targetResp="OK"):
    print(color.MAGENTA + cmd + color.END)
    uart.write(cmd)
    return esp_waitResp(uart, timeout, targetResp)

"""
esp_waitResp: wait ESP response
if targetResp catched, return True
otherwise return False
"""
def esp_waitResp(uart=uart_esp, timeout=2000, targetResp="OK"):
    targetCatched = False
    prvMills = utime.ticks_ms()
    print(color.BLUE)
    while (utime.ticks_diff(utime.ticks_ms(), prvMills))<timeout:
        line=uart.readline()
        if line is not None:
            try:
                line_decoded = line.strip().decode()
                if line_decoded == targetResp:
                    print(color.GREEN + line_decoded)
                    targetCatched = True
                    break
                
                #more checking for Response
                elif line_decoded == "OK":
                    print(color.GREEN + line_decoded)
                    break
                elif line_decoded == "ERROR":
                    print(color.RED + line_decoded)
                    break
                
                elif line_decoded == "SEND OK":
                    print(color.GREEN + line_decoded)
                    break
                elif line_decoded == "SEND FAIL":
                    print(color.RED + line_decoded)
                    break
                
                else:
                    print(line_decoded)
            except UnicodeError:
                print(line)
    
    print(color.END)
    return targetCatched
    
# In my test there are something
# like "################################################"
# follow ready of AT+RST/AT+RESTORE or hardware reset,
# Just dummy wait response to clear it
def esp_waitDummtResp(uart=uart_esp, timeout=2000):
    esp_waitResp(uart=uart_esp, timeout=2000)

#A dummy infinity loop
#to monitor any data sent from ESP via UART
def esp_dummyMonitor(uart=uart_esp):
    while True:
        line=uart.readline()
        if line is not None:
            try:
                line_decoded = line.strip().decode()
                print(line_decoded)
            except UnicodeError:
                print(line)
#==================================

print()
print("=== Start ===")


#hardware reset ESP
print("Hardware reset")
esp_reset.value(1)
utime.sleep(0.5)
esp_reset.value(0)
utime.sleep(0.5)
esp_reset.value(1)

print("wait 'ready' from Hardware Reset\n")
esp_waitResp(targetResp='ready')
esp_waitDummtResp()

esp_sendCMD_waitResp('AT\r\n')          #Test AT startup

esp_sendCMD_waitResp('AT+GMR\r\n')      #Check version information

esp_sendCMD_waitResp('AT+RESTORE\r\n')
esp_waitResp(targetResp='ready')    #wait ready
esp_waitDummtResp()

esp_sendCMD_waitResp('AT+CIPAPMAC?\r\n')  #Query MAC address of ESP SoftAP
esp_sendCMD_waitResp('AT+CIPSTAMAC?\r\n') #Query MAC address of ESP station

esp_sendCMD_waitResp('AT+CWMODE?\r\n')  #Query the Wi-Fi mode
esp_sendCMD_waitResp('AT+CWMODE=2\r\n') #Set the Wi-Fi mode = SoftAP mode
esp_sendCMD_waitResp('AT+CWMODE?\r\n')  #Query the Wi-Fi mode again

esp_sendCMD_waitResp('AT+CIPMUX=1\r\n') #Enable multiple connections
esp_sendCMD_waitResp('AT+CIPMUX?\r\n')

esp_sendCMD_waitResp('AT+CWSAP="esp","password",5,3\r\n')  #config ESP SoftAP
esp_sendCMD_waitResp('AT+CWSAP?\r\n')

esp_sendCMD_waitResp('AT+CIPAP?\r\n')  #Query the IP address of the ESP SoftAP

esp_dummyMonitor()

print("\n~ bye ~\n");

mpyPico_ESP_AT_ScanAP.py, scan Access Point.
"""
Raspberry Pi Pico/MicroPython + ESP-C3-12F exercise

ESP-C3-12F with AT-command firmware:
AT version:2.2.0.0(s-90458f0 - ESP32C3 - Jun 18 2021 10:24:22)

Scan Access Point
"""

import usys, uos
import machine
import utime

class color:
    BLACK =   '\033[1;30;48m'
    RED =     '\033[1;31;48m'
    GREEN =   '\033[1;32;48m'
    YELLOW =  '\033[1;33;48m'
    BLUE =    '\033[1;34;48m'
    MAGENTA = '\033[1;35;48m'
    CYAN =    '\033[1;36;48m'
    END =     '\033[1;37;0m'

print("====================================")
print(usys.implementation[0], uos.uname()[3],
      "\nrun on", uos.uname()[4])
print("------------------------------------")


esp_reset = machine.Pin(2, machine.Pin.OUT)

uart_esp =machine.UART(0, timeout=1000)
#print("UART(0) connected to ESP: \n", uart_esp, "\n")

#==================================
"""
esp_sendCMD_waitResp: send comand to ESP, and wait response
if targetResp catched, return True
otherwise return False
"""
def esp_sendCMD_waitResp(cmd, uart=uart_esp, timeout=2000, targetResp="OK"):
    print(color.MAGENTA + cmd + color.END)
    uart.write(cmd)
    return esp_waitResp(uart, timeout, targetResp)

"""
esp_waitResp: wait ESP response
if targetResp catched, return True
otherwise return False
"""
def esp_waitResp(uart=uart_esp, timeout=2000, targetResp="OK"):
    targetCatched = False
    prvMills = utime.ticks_ms()
    print(color.BLUE)
    while (utime.ticks_diff(utime.ticks_ms(), prvMills))<timeout:
        line=uart.readline()
        if line is not None:
            try:
                line_decoded = line.strip().decode()
                if line_decoded == targetResp:
                    print(color.GREEN + line_decoded)
                    targetCatched = True
                    break
                
                #more checking for Response
                elif line_decoded == "OK":
                    print(color.GREEN + line_decoded)
                    break
                elif line_decoded == "ERROR":
                    print(color.RED + line_decoded)
                    break
                
                elif line_decoded == "SEND OK":
                    print(color.GREEN + line_decoded)
                    break
                elif line_decoded == "SEND FAIL":
                    print(color.RED + line_decoded)
                    break
                
                else:
                    print(line_decoded)
            except UnicodeError:
                print(line)
    
    print(color.END)
    return targetCatched

# In my test there are something
# like "################################################"
# follow ready of AT+RST/AT+RESTORE or hardware reset,
# Just dummy wait response to clear it
def esp_waitDummtResp(uart=uart_esp, timeout=2000):
    esp_waitResp(uart=uart_esp, timeout=2000)

#A dummy infinity loop
#to monitor any data sent from ESP via UART
def esp_dummyMonitor(uart=uart_esp):
    while True:
        line=uart.readline()
        if line is not None:
            try:
                line_decoded = line.strip().decode()
                print(line_decoded)
            except UnicodeError:
                print(line)
#==================================

print()
print("=== Start ===")

#hardware reset ESP
print("Hardware reset")
esp_reset.value(1)
utime.sleep(0.5)
esp_reset.value(0)
utime.sleep(0.5)
esp_reset.value(1)

print("wait 'ready' from Hardware Reset\n")
esp_waitResp(targetResp='ready')
esp_waitDummtResp()

esp_sendCMD_waitResp('AT\r\n')          #Test AT startup

esp_sendCMD_waitResp('AT+GMR\r\n')      #Check version information

esp_sendCMD_waitResp('AT+RESTORE\r\n')
esp_waitResp(targetResp='ready')    #wait ready
esp_waitDummtResp()

esp_sendCMD_waitResp('AT+CIPAPMAC?\r\n')  #Query MAC address of ESP SoftAP
esp_sendCMD_waitResp('AT+CIPSTAMAC?\r\n') #Query MAC address of ESP station

esp_sendCMD_waitResp('AT+CWMODE=1\r\n') #Set the Wi-Fi mode = Station mode
esp_sendCMD_waitResp('AT+CWMODE?\r\n')  #Query the Wi-Fi mode again

preScanTime = utime.ticks_ms()
esp_sendCMD_waitResp('AT+CWLAP\r\n', timeout=10000)  #List Available APs
print("Time used to Scan AP: ",
      utime.ticks_diff(utime.ticks_ms(), preScanTime),
      "(ms)")

print("\n~ bye ~\n");

mpyPico_ESP_AT_TCP_Client.py, TCP Client connect to TCP Server on Raspberry Pi/Python, pyMyTCPServer.py below.
"""
Raspberry Pi Pico/MicroPython + ESP-C3-12F exercise

ESP-C3-12F with AT-command firmware:
AT version:2.2.0.0(s-90458f0 - ESP32C3 - Jun 18 2021 10:24:22)

Pico send AT command to ESP-C3-12F via UART,
- set in station mode
- join AP
- connect to server ip:port 9999
- send text and wait response

Modified from exercise in my another blogspot
https://helloraspberrypi.blogspot.com/2021/02/
picomicropython-esp-01s-at-command-act.html
"""

import usys, uos
import machine
import utime

class color:
    BLACK =   '\033[1;30;48m'
    RED =     '\033[1;31;48m'
    GREEN =   '\033[1;32;48m'
    YELLOW =  '\033[1;33;48m'
    BLUE =    '\033[1;34;48m'
    MAGENTA = '\033[1;35;48m'
    CYAN =    '\033[1;36;48m'
    END =     '\033[1;37;0m'
   
#server port & ip hard-coded,
#have to match with server side setting
server_ip="192.168.120.147"
server_port=9999

print("====================================")
print(usys.implementation[0], uos.uname()[3],
      "\nrun on", uos.uname()[4])
print("------------------------------------")


esp_reset = machine.Pin(2, machine.Pin.OUT)

#uart_esp =machine.UART(0, baudrate=115200)
uart_esp =machine.UART(0, timeout=1000)
#print("UART(0) connected to ESP: \n", uart_esp, "\n")

#==================================
"""
esp_sendCMD_waitResp: send comand to ESP, and wait response
if targetResp catched, return True
otherwise return False
"""
def esp_sendCMD_waitResp(cmd, uart=uart_esp, timeout=2000, targetResp="OK"):
    print(color.MAGENTA + cmd + color.END)
    uart.write(cmd)
    return esp_waitResp(uart, timeout, targetResp)

"""
esp_waitResp: wait ESP response
if targetResp catched, return True
otherwise return False
"""
def esp_waitResp(uart=uart_esp, timeout=2000, targetResp="OK"):
    targetCatched = False
    prvMills = utime.ticks_ms()
    print(color.BLUE)
    while (utime.ticks_diff(utime.ticks_ms(), prvMills))<timeout:
        line=uart.readline()
        if line is not None:
            try:
                line_decoded = line.strip().decode()
                if line_decoded == targetResp:
                    print(color.GREEN + line_decoded)
                    targetCatched = True
                    break
                
                #more checking for Response
                elif line_decoded == "OK":
                    print(color.GREEN + line_decoded)
                    break
                elif line_decoded == "ERROR":
                    print(color.RED + line_decoded)
                    break
                
                elif line_decoded == "SEND OK":
                    print(color.GREEN + line_decoded)
                    break
                elif line_decoded == "SEND FAIL":
                    print(color.RED + line_decoded)
                    break
                
                else:
                    print(line_decoded)
            except UnicodeError:
                print(line)
    
    print(color.END)
    return targetCatched


def espSend(text="test", uart=uart_esp):
    esp_sendCMD_waitResp('AT+CIPSEND=' + str(len(text)) + '\r\n')
    esp_waitResp(targetResp=">")
    esp_sendCMD_waitResp(text)

# In my test there are something
# like "################################################"
# follow ready of AT+RST/AT+RESTORE or hardware reset,
# Just dummy wait response to clear it
def esp_waitDummtResp(uart=uart_esp, timeout=2000):
    esp_waitResp(uart=uart_esp, timeout=2000)

#==================================

print()
print("=== Start ===")

#hardware reset ESP
print("Hardware reset")
esp_reset.value(1)
utime.sleep(0.5)
esp_reset.value(0)
utime.sleep(0.5)
esp_reset.value(1)

print("wait 'ready' from Hardware Reset\n")
esp_waitResp(targetResp='ready')
esp_waitDummtResp()

esp_sendCMD_waitResp('AT\r\n')          #Test AT startup

esp_sendCMD_waitResp('AT+GMR\r\n')      #Check version information

esp_sendCMD_waitResp('AT+RESTORE\r\n')
esp_waitResp(targetResp='ready')    #wait ready
esp_waitDummtResp()

esp_sendCMD_waitResp('AT+CIPAPMAC?\r\n')  #Query MAC address of ESP SoftAP
esp_sendCMD_waitResp('AT+CIPSTAMAC?\r\n') #Query MAC address of ESP station

esp_sendCMD_waitResp('AT+CWMODE?\r\n')  #Query the Wi-Fi mode
esp_sendCMD_waitResp('AT+CWMODE=1\r\n') #1 = Station mode
                                    #2 = SoftAP mode
esp_sendCMD_waitResp('AT+CWMODE?\r\n')  #Query the Wi-Fi mode again

esp_sendCMD_waitResp('AT+CWJAP="ssid","password"\r\n', timeout=5000) #Connect to AP
esp_sendCMD_waitResp('AT+CIFSR\r\n')    #Obtain the Local IP Address

esp_sendCMD_waitResp('AT+CIPSTART="TCP","' +
                     server_ip + '",' +
                     str(server_port) + '\r\n')

espSend()

while True:
    print('Enter something:')
    msg = input()
    esp_sendCMD_waitResp('AT+CIPSTART="TCP","' +
                         server_ip + '",' +
                         str(server_port) + '\r\n')
    espSend(msg)
    
print("\n~ bye ~\n");

pyMyTCPServer.py, TCP Server run on Raspberry Pi.
"""
Simple Python TCP Server
tested on Raspberry Pi/Python3

ref:
https://docs.python.org/3/library/socketserver.html
"""
import socketserver
import platform

print("sys info:")
for info in platform.uname():
    print(info)

class MyTCPHandler(socketserver.BaseRequestHandler):
    """
    The request handler class for our server.

    It is instantiated once per connection to the server, and must
    override the handle() method to implement communication to the
    client.
    """

    def handle(self):
        # self.request is the TCP socket connected to the client
        self.data = self.request.recv(1024).strip()
        print("{} wrote:".format(self.client_address[0]))
        print(self.data)
        # just send back the same data, but upper-cased
        self.request.sendall(self.client_address[0].encode())
        self.request.sendall(self.data.upper())
        self.request.sendall(b'\r\n')

if __name__ == "__main__":
    HOST, PORT = "localhost", 9999

    # Create the server, binding to localhost on port 9999
    #with socketserver.TCPServer((HOST, PORT), MyTCPHandler) as server:
    with socketserver.TCPServer(('', PORT), MyTCPHandler) as server:
        # Activate the server; this will keep running until you
        # interrupt the program with Ctrl-C

        server.serve_forever()

mpyPico_ESP_AT_UDP.py, UDP connect to pyUDP_server.py (listed below) on Raspberry Pi.
"""
Raspberry Pi Pico/MicroPython + ESP-C3-12F exercise

ESP-C3-12F with AT-command firmware:
AT version:2.2.0.0(s-90458f0 - ESP32C3 - Jun 18 2021 10:24:22)

Pico send AT command to ESP-C3-12F via UART,
- set in station mode
- join AP
- connect to UDP
- send text and wait response

ref:
https://docs.espressif.com/projects/esp-at/en/release-v2.2.0.0_esp32c3/
AT_Command_Examples/TCP-IP_AT_Examples.html
Example 3.2. UDP Transmission with Changeable Remote IP and Port
"""

import usys, uos
import machine
import utime

class color:
    BLACK =   '\033[1;30;48m'
    RED =     '\033[1;31;48m'
    GREEN =   '\033[1;32;48m'
    YELLOW =  '\033[1;33;48m'
    BLUE =    '\033[1;34;48m'
    MAGENTA = '\033[1;35;48m'
    CYAN =    '\033[1;36;48m'
    END =     '\033[1;37;0m'
   
#server port & ip hard-coded,
#have to match with server side setting
server_ip="192.168.120.147"
server_port=8000

print("====================================")
print(usys.implementation[0], uos.uname()[3],
      "\nrun on", uos.uname()[4])
print("------------------------------------")


esp_reset = machine.Pin(2, machine.Pin.OUT)

#uart_esp =machine.UART(0, baudrate=115200)
uart_esp =machine.UART(0, timeout=1000)
#print("UART(0) connected to ESP: \n", uart_esp, "\n")

#==================================
"""
esp_sendCMD_waitResp: send comand to ESP, and wait response
if targetResp catched, return True
otherwise return False
"""
def esp_sendCMD_waitResp(cmd, uart=uart_esp, timeout=2000, targetResp="OK"):
    print(color.MAGENTA + cmd + color.END)
    uart.write(cmd)
    return esp_waitResp(uart, timeout, targetResp)

"""
esp_waitResp: wait ESP response
if targetResp catched, return True
otherwise return False
"""
def esp_waitResp(uart=uart_esp, timeout=2000, targetResp="OK"):
    targetCatched = False
    prvMills = utime.ticks_ms()
    print(color.BLUE)
    while (utime.ticks_diff(utime.ticks_ms(), prvMills))<timeout:
        line=uart.readline()
        if line is not None:
            try:
                line_decoded = line.strip().decode()
                if line_decoded == targetResp:
                    print(color.GREEN + line_decoded)
                    targetCatched = True
                    break
                
                #more checking for Response
                elif line_decoded == "OK":
                    print(color.GREEN + line_decoded)
                    break
                elif line_decoded == "ERROR":
                    print(color.RED + line_decoded)
                    break
                
                elif line_decoded == "SEND OK":
                    print(color.GREEN + line_decoded)
                    break
                elif line_decoded == "SEND FAIL":
                    print(color.RED + line_decoded)
                    break
                
                else:
                    print(line_decoded)
            except UnicodeError:
                print(line)
    
    print(color.END)
    return targetCatched


def espSend(text="test", uart=uart_esp):
    esp_sendCMD_waitResp('AT+CIPSEND=' + str(len(text)) + '\r\n')
    esp_waitResp(targetResp=">")
    esp_sendCMD_waitResp(text)

# In my test there are something
# like "################################################"
# follow ready of AT+RST/AT+RESTORE or hardware reset,
# Just dummy wait response to clear it
def esp_waitDummtResp(uart=uart_esp, timeout=2000):
    esp_waitResp(uart=uart_esp, timeout=2000)

#==================================

print()
print("=== Start ===")

#hardware reset ESP
print("Hardware reset")
esp_reset.value(1)
utime.sleep(0.5)
esp_reset.value(0)
utime.sleep(0.5)
esp_reset.value(1)

print("wait 'ready' from Hardware Reset\n")
esp_waitResp(targetResp='ready')
esp_waitDummtResp()

esp_sendCMD_waitResp('AT\r\n')          #Test AT startup

esp_sendCMD_waitResp('AT+GMR\r\n')      #Check version information

esp_sendCMD_waitResp('AT+RESTORE\r\n')
esp_waitResp(targetResp='ready')    #wait ready
esp_waitDummtResp()

esp_sendCMD_waitResp('AT+CIPAPMAC?\r\n')  #Query MAC address of ESP SoftAP
esp_sendCMD_waitResp('AT+CIPSTAMAC?\r\n') #Query MAC address of ESP station

esp_sendCMD_waitResp('AT+CWMODE=3\r\n') #3 = SoftAP+Station mode
esp_sendCMD_waitResp('AT+CWMODE?\r\n')  #Query the Wi-Fi mode again

esp_sendCMD_waitResp('AT+CWJAP="ssid","password"\r\n', timeout=5000) #Connect to AP
esp_sendCMD_waitResp('AT+CIFSR\r\n')    #Obtain the Local IP Address
            
esp_sendCMD_waitResp('AT+CIPSTART="UDP","' +
                     server_ip +
                     '",' + str(server_port) +
                     ',' + '1112,2\r\n')

espSend()

while True:
    print('Enter something:')
    msg = input()
    espSend(msg)

print("\n~ bye ~\n");

pyUDP_server.py
"""
Simple Python UDP Server
tested on Raspberry Pi/Python3

ref:
https://wiki.python.org/moin/UdpCommunication
"""
import socket

#have to match my ip
UDP_IP = "192.168.120.147"
UDP_PORT = 8000

sock = socket.socket(socket.AF_INET, # Internet
                     socket.SOCK_DGRAM) # UDP
sock.bind((UDP_IP, UDP_PORT))

print("UDP IP: %s" % UDP_IP)
print("UDP port: %s" % UDP_PORT)

while True:
    data, addr = sock.recvfrom(1024) # buffer size is 1024 bytes
    print("received message: %s" % data)

Wednesday, October 20, 2021

ESP32-C3/MicroPython + SSD1306 I2C OLED

With MicroPython firmware installed on ESP32-C3, It's a exercise to drive 0.96 inch 128x64 OLED with SSD1306 I2C driver.

To driver SSD1306 I2C OLED, we are going to install MicroPython ssd1306 driver:
- Visit https://github.com/micropython/micropython/blob/master/drivers/display/ssd1306.py
- Copy ssd1306.py and upload to MicroPython device, '/' directory.

In my exercise, the default I2C(0) is used to drive SSD1306; SCL = 18, SDA = 19.

Connection between:
ESP32-C3    I2C SSD1306 OLED
============================
GND	    GND
3V3	    VCC
18	    SCL
19	    SDA

Exercise code

mpyESP32C3_ssd1306.py, a simplest example. Also show how to catch OSError of ENODEV if no I2C ssd1306 connected.
"""
MicroPython/ESP32C3 exercise run on ESP32-C3-DevKitM-1,
work with SSD1306 I2C OLED.
And,catch exception of ENODEV.

"""
import uos
import usys
import machine
import time
import ssd1306

print("====================================")
print(usys.implementation[0], uos.uname()[3],
      "\nrun on", uos.uname()[4])
print("====================================")

oled_i2c = machine.I2C(0)
print("Default I2C:", oled_i2c)
"""
oled_ssd1306 = ssd1306.SSD1306_I2C(128, 64, oled_i2c)
print("Default SSD1306 I2C address:",
          oled_ssd1306.addr, "/", hex(oled_ssd1306.addr))
oled_ssd1306.text('Hello, World!', 0, 0, 1)
oled_ssd1306.show()

"""
try:
    oled_ssd1306 = ssd1306.SSD1306_I2C(128, 64, oled_i2c)
    print("Default SSD1306 I2C address:",
          oled_ssd1306.addr, "/", hex(oled_ssd1306.addr))
    oled_ssd1306.text('Hello, World!', 0, 0, 1)
    oled_ssd1306.show()
except OSError as exc:
    print("OSError!", exc)
    if exc.errno == errno.ENODEV:
        print("No such device")

print("~ bye ~")

mpyESP32C3_ssd1306_hello.py
"""
MicroPython/ESP32C3 exercise run on ESP32-C3-DevKitM-1,
work with SSD1306 I2C OLED.
HelloWorld! with something.

"""
import uos
import usys
import machine
import time
import ssd1306

print("====================================")
print(usys.implementation[0], uos.uname()[3],
      "\nrun on", uos.uname()[4])
print("====================================")

oled_i2c = machine.I2C(0)
print("Default I2C:", oled_i2c)

oled_ssd1306 = ssd1306.SSD1306_I2C(128, 64, oled_i2c)
print("Default SSD1306 I2C address:",
      oled_ssd1306.addr, "/", hex(oled_ssd1306.addr))

oled_ssd1306.fill(1)    #all on
oled_ssd1306.show()
time.sleep(1)
oled_ssd1306.fill(0)    #all off
oled_ssd1306.show()
time.sleep(1)

def splitLine(src, cnt):
    return [src[i:i+cnt] for i in range(0, len(src), cnt)]

oled_ssd1306.text('Hello, World!', 0, 0, 1)
oled_ssd1306.text(str(usys.implementation[0]), 0, 20, 1)
oled_ssd1306.text(str(uos.uname()[2]), 0, 30, 1)

line_y = 40
for l in splitLine(str(uos.uname()[4]), 15):
    oled_ssd1306.text(l, 0, line_y, 1)
    line_y += 10

oled_ssd1306.show()
time.sleep(1)

oled_ssd1306.invert(1)
oled_ssd1306.show()
time.sleep(0.5)
oled_ssd1306.invert(0)
oled_ssd1306.show()
time.sleep(0.5)

oled_ssd1306.rect(15, 15, oled_ssd1306.width-30,
                  oled_ssd1306.height-30, 1)
oled_ssd1306.show()
time.sleep(1)

for x in range(oled_ssd1306.width):
    oled_ssd1306.scroll(1, 0)
    oled_ssd1306.show()
    time.sleep_ms(50)

oled_ssd1306.fill_rect(15, 15, oled_ssd1306.width-30,
                  oled_ssd1306.height-30, 1)
oled_ssd1306.show()
time.sleep(0.5)
oled_ssd1306.text('~ bye ~', 55, 55, 1)
oled_ssd1306.show()
time.sleep(0.5)

print("~ bye ~")

mpyESP32C3_ssd1306_pixel.py
"""
MicroPython/ESP32C3 exercise run on ESP32-C3-DevKitM-1,
work with SSD1306 I2C OLED.

Draw randow pixels

"""
import uos
import usys
import machine
import time
import urandom
import ssd1306

print("====================================")
print(usys.implementation[0], uos.uname()[3],
      "\nrun on", uos.uname()[4])
print("====================================")

oled_i2c = machine.I2C(0)
print("Default I2C:", oled_i2c)

oled_ssd1306 = ssd1306.SSD1306_I2C(128, 64, oled_i2c)
print("Default SSD1306 I2C address:",
          oled_ssd1306.addr, "/", hex(oled_ssd1306.addr))

def drawRandomPixels():
    for i in range(500):
        rx = urandom.randint(0, oled_ssd1306.width)
        ry = urandom.randint(0, oled_ssd1306.height)
        oled_ssd1306.pixel(rx, ry, 1)
        oled_ssd1306.show()
    
oled_ssd1306.fill(0)
oled_ssd1306.text('Random Pixels', 0, 0, 1)
oled_ssd1306.show()
drawRandomPixels()

oled_ssd1306.fill(0)
oled_ssd1306.text('Random Pixels', 0, 0, 1)
oled_ssd1306.invert(1)
oled_ssd1306.show()
drawRandomPixels()

print("~ bye ~")

mpyESP32C3_ssd1306_text.py
"""
MicroPython/ESP32C3 exercise run on ESP32-C3-DevKitM-1,
work with SSD1306 I2C OLED.

Display scrolling text (read from /boot.py) on OLED.

"""
import uos
import usys
import machine
import time
import ssd1306

# Read and display /boot.py file
def readBoot():
    print("========================")
    fileBoot = "/boot.py"
    print(fileBoot)
    print("========================")
    with open(fileBoot, "r") as f:
        line = f.readline()
        while line != '':
            line = line.rstrip() #strip trailing whitespace
            print(line)
            oled_ssd1306.scroll(0, -10)
            oled_ssd1306.text(line, 0, 46, 1)
            oled_ssd1306.show()
            time.sleep(0.5)
            line = f.readline()
    print("========================")

print("====================================")
print(usys.implementation[0], uos.uname()[3],
      "\nrun on", uos.uname()[4])
print("====================================")

oled_i2c = machine.I2C(0)
print("Default I2C:", oled_i2c)

oled_ssd1306 = ssd1306.SSD1306_I2C(128, 64, oled_i2c)
print("Default SSD1306 I2C address:",
          oled_ssd1306.addr, "/", hex(oled_ssd1306.addr))
oled_ssd1306.text('Hello, World!', 0, 0, 1)
oled_ssd1306.show()

readBoot()

print("~ bye ~")

Monday, October 18, 2021

Flash MicroPython firmware on ESP32-C3

Updated@2022-06-19

~ Flash MicroPython v1.19 firmware on ESP32-C3 (ESP32-C3-DevKitM-1/NodeMCU ESP-C3-32S-Kit)

It is aimed to flash MicroPython firmware on ESP32-C3-DevKitM-1 with unknown ESP32-C3 (revision 3) and 4MB flash. I can't find any official installation instruction, it's found out by my guessing and trying. Not sure is it correct approach, it seem work for me anyway.

Download MicroPython for ESP32-C3:

Visit https://micropython.org/download/all/, search "esp32c3" firmware for ESP32-C3, download the .bin file. It's esp32c3usb-20211018-unstable-v1.17-84-gba940250a.bin I tried.

Identify port and chip/flash:

To identify the ESP32 device connected USB port, chip and flash, refer to the last post.

It's unknown ESP32-C3 (revision 3) and 4MB flash on  ESP32-C3-DevKitM-1, connected to /dev/ttyUSB0.

Flash MicroPython firmware:

Erase flash, enter the command:

$ esptool.py --port /dev/ttyUSB0 erase_flash
Flash firmware with:
$ esptool.py --chip esp32c3 -p /dev/ttyUSB0 -b 460800 \
--before=default_reset --after=hard_reset write_flash --flash_mode dio \
--flash_freq 80m --flash_size 4MB \
0x0 esp32c3usb-20211018-unstable-v1.17-84-gba940250a.bin
* write firmware to on ESP flash from 0x0

Wrote 1495696 bytes (888871 compressed) at 0x00000000 in 26.6 seconds (effective 449.2 kbit/s)...

ref:

To verify that data in flash matches a local file:

$ esptool.py verify_flash --diff yes 0x0 <.bin>

ref:


MicroPython code tried:

mpyESP32C3_info.py
"""
MicroPython/ESP32C3 exercise run on ESP32-C3-DevKitM-1,
to display info.
"""
import uos
import usys
import machine
import esp

print("from uos.uname():")
for u in uos.uname():
    print(u)
print()

print("from usys:")
print("usys.platform: ", usys.platform)
print("usys.implementation: ", usys.implementation)
print()

print("====================================")
print(usys.implementation[0], uos.uname()[3],
      "\nrun on", uos.uname()[4])
print("====================================")
print("Flash size:", esp.flash_size())
print("CPU frequency:", machine.freq(), "(Hz)")
mpyESP32C3_RGB.py
"""
MicroPython/ESP32C3 exercise run on ESP32-C3-DevKitM-1,
to control the onboard GB LED (WS2812), driven by GPIO8.
"""

import uos
import usys
import machine
import neopixel
import time

print("====================================")
print(usys.implementation[0], uos.uname()[3],
      "\nrun on", uos.uname()[4])
print("====================================")

np = neopixel.NeoPixel(machine.Pin(8), 1)

np[0] = (0, 0, 0)
np.write()
time.sleep(1)
np[0] = (100, 100, 100)
np.write()
time.sleep(1)
np[0] = (0, 0, 0)
np.write()
time.sleep(1)

for i in range(256):
    np[0] = (i, 0, 0)
    np.write()
    time.sleep_ms(10)
for i in range(256):
    np[0] = (0, i, 0)
    np.write()
    time.sleep_ms(10)
for i in range(256):
    np[0] = (0, 0, i)
    np.write()
    time.sleep_ms(10)

for i in range(256):
    np[0] = (255-i, 255-i, 255-i)
    np.write()
    time.sleep_ms(10)



It's another form of the flash command, modified from  Getting started with MicroPython on the ESP32 > Deploying the firmware, change starting address to 0x0.

$ esptool.py --chip esp32c3 --port /dev/ttyUSB0 write_flash -z \
0x0 esp32c3usb-20211018-unstable-v1.17-84-gba940250a.bin
but it take longer time.

Wrote 1495696 bytes (888871 compressed) at 0x00000000 in 79.6 seconds (effective 150.4 kbit/s)...




More ESP32-C3/MicroPython exercises:

Tuesday, March 16, 2021

ESP32/MicroPython: get wifi network info, scan networks

 Get WiFi MAC address, network info such as ip, netmask...

from os import uname
from sys import implementation
import network
import ubinascii

ssid = "ssid"
password = "password"

print(implementation.name)
print(uname()[3])
print(uname()[4])
print()

mac = ubinascii.hexlify(network.WLAN().config('mac'),':').decode()
print("MAC: " + mac)
print()

def do_connect():
    print('connect to network...')
    wlan = network.WLAN(network.STA_IF)
    wlan.active(True)
    if not wlan.isconnected():
        print('...')
        wlan.connect(ssid, password)
        while not wlan.isconnected():
            pass
    
    print()
    print('network config:')
    print("interface's IP/netmask/gw/DNS addresses")
    print(wlan.ifconfig())
    
do_connect()

print('- bye -')


Scan WiFi networks:


from os import uname
from sys import implementation
import network
import ubinascii
import utime

ssid = "ssid"
password = "password"

print(implementation.name)
print(uname()[3])
print(uname()[4])
print()

mac = ubinascii.hexlify(network.WLAN().config('mac'),':').decode()
print("MAC: " + mac)
print()

#init ESP32 as STA
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.disconnect()
utime.sleep(1)

def do_connect():
    global wlan
    print('connect to network...')
    
    wlan.active(True)
    if not wlan.isconnected():
        print('...')
        wlan.connect(ssid, password)
        while not wlan.isconnected():
            pass
    
    print()
    print('network config:')
    print("interface's IP/netmask/gw/DNS addresses")
    print(wlan.ifconfig())
    
def do_scan():
    global wlan
    print('scan network...')
    wlan.active(True)
    for network in wlan.scan():
        print(network)
    
do_scan()

print('\n- bye -')



ref:
~ Docs > Quick reference for the ESP32 > Networking


Friday, September 11, 2020

MicroPython on pyboard to display text on OLED (with I2C interface ssd1306)

It's a 0.96 inch 128x64  OLED displays with ssd1306 driver using I2C interface. This example show how to display text on it with pyboard using MicroPython ssd1306 library.

Connection between pyboard and OLED:
pyboard 3V3 connected to ssd1306 VCC
pyboard GND connected to ssd1306 GND
pyboard X12 connected to ssd1306 SCL
pyboard X11 connected to ssd1306 SDA


Download the MicroPython ssd1306 library HERE, ans save it to pyboard flash.

MicroPython script, pyb_i2c_ssd1306.py:

# Download ssd1306
# https://github.com/micropython/micropython/blob/master/drivers/display/ssd1306.py
# and save to pyboard flash

import ssd1306
import machine
import time

WIDTH = const(128)
HEIGHT = const(64)

ssd1306_scl = machine.Pin('X12', machine.Pin.OUT_PP)
ssd1306_sda = machine.Pin('X11', machine.Pin.OUT_PP)
i2c_ssd1306 = machine.I2C(scl=ssd1306_scl, sda=ssd1306_sda)

print(i2c_ssd1306.scan())
oled = ssd1306.SSD1306_I2C(WIDTH, HEIGHT, i2c_ssd1306)
oled.fill(0)

oled.text("MicroPython", 0, 0)
oled.text("OLED(ssd1306)", 0, 10)
oled.text("pyboard", 0, 20)
oled.show()

while True:
    time.sleep(1)
    oled.invert(1)
    time.sleep(1)
    oled.invert(0)


pyboard MicroPython exercise: scan I2C address

 It's a MicroPython exercise running on pyboard, to scan I2C address of attached I2C device. Actually, the I2C device used is a OLED with I2C interface ssd1306 in next exercise.

i2c_scan.py

#I2C SCL connected to pyboard X12
#I2C SDA connected to pyboard X11

import machine
i2c_scl = machine.Pin('X12', machine.Pin.OUT_PP)
i2c_sda = machine.Pin('X11', machine.Pin.OUT_PP)

i2c = machine.I2C(scl=i2c_scl, sda=i2c_sda)
print(i2c.scan())


Tuesday, August 18, 2020

MicroPython (ESP32): micropython.mem_info() to get memory information

 The function mem_info() of micropython module Print information about currently used memory. If the verbose argument is given then extra information is printed.

The information that is printed is implementation dependent, but currently includes the amount of stack and heap used. In verbose mode it prints out the entire heap indicating which blocks are used and which are free.

ref: https://docs.micropython.org/en/latest/library/micropython.html?highlight=mem_info#micropython.mem_info




Thursday, August 13, 2020

Simple web server on ESP32 using MicroPython


It's a ESP32/MicroPython exercise to implement a simple web server.


ap.py
try:
    import usocket as socket
except:
    import socket
  
import network
import esp
import uos

uname = uos.uname()

ap = network.WLAN(network.AP_IF)
ap.active(True)

ssid = "ssid"
password = "password"
ap.config(essid=ssid, password=password)

while ap.active() == False:
    pass

print('Access Point Active')
print(ap.ifconfig())
print("Connect to " + ssid + ":" + "password = " + password);
print("Visit: " + ap.ifconfig()[2] + ":80")

print('===== ===== =====')

res_01 = """<html>
    <head><meta name="viewport" content="width=device-width, initial-scale=1"></head>
    <body>
    <h1>Hello, MicroPython@ESP32!</h1>"""
res_02= """
    </body>
    </html>"""

res_something = "sysname: " + uname.sysname + "<br/>"\
    "nodename: " + uname.nodename + "<br/>" +\
    "release: " + uname.release + "<br/>" +\
    "version: " + uname.version + "<br/>" +\
    "machine: " + uname.machine + "<br/>"

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(('', 80))
s.listen(5)

while True:
    conn, addr = s.accept()
    youraddr = str(addr)
    request = conn.recv(1024)

    conn.send(res_01)
    conn.send(res_something)
    conn.send("<br/><br/>")
    conn.send(youraddr)
    conn.send("<br/><br/>")
    conn.send(request)
    conn.send(res_02)
    conn.close()


Tuesday, August 11, 2020

microPython exercise (ESP32): get system info

The uos module contains functions for filesystem access and mounting, terminal redirection and duplication, and the uname and urandom functions. Its uname() function return a tuple containing information about the underlying machine and/or its operating system. The tuple has five fields in the following order, each of them being a string:

  • sysname – the name of the underlying system
  • nodename – the network name (can be the same as sysname)
  • release – the version of the underlying system
  • version – the MicroPython version and build date
  • machine – an identifier for the underlying hardware (eg board, CPU)

Example, sysinfo.py

import uos

uname = uos.uname()
print(uname)
print("sysname: " + uname.sysname)
print("nodename: " + uname.nodename)
print("release: " + uname.release)
print("version: " + uname.version)
print("machine: " + uname.machine)