=====================================================================================
<< BACK >>
=====================================================================================
ws/Http Server
import os
import ssl
import json
import time
import asyncio
import threading
import websockets
import sys
import subprocess
from http.server import HTTPServer, SimpleHTTPRequestHandler
from gpiozero import AngularServo, LED
from gpiozero.pins.pigpio import PiGPIOFactory
# --- 1. グローバル変数の宣言と初期化 ---
# 変数名だけ用意
factory = None
servo_ud = None
servo_rl = None
ssr = None
httpd = None
running = True
is_homing = False
light_ = 0
rl_ang, rl_dir = 0, 0
ud_ang, ud_dir = 0, 0
# --- 2. HTTPハンドラ(ブラウザに画面を出す役目) ---
class RestrictedHTTPHandler(SimpleHTTPRequestHandler):
def do_GET(self):
allowed_extensions = {'.html', '.css', '.js', '.ico'}
_, ext = os.path.splitext(self.path)
if self.path == "/" or ext.lower() in allowed_extensions:
return super().do_GET()
else:
self.send_error(403, "Access Denied.")
# --- 3. モーター制御ロジック(別スレッドで動く) ---
def motor_loop(servo, limit):
global ud_ang, rl_ang, ud_dir, rl_dir, running, is_homing
step = 0.5
is_ud = (servo == servo_ud)
servo.detach() # 最初は脱力
while running:
cur_dir = ud_dir if is_ud else rl_dir
if cur_dir != 0:
angle = ud_ang if is_ud else rl_ang
angle += (cur_dir * step)
angle = max(-limit, min(limit, angle))
servo.angle = angle
if is_ud: ud_ang = angle
else: rl_ang = angle
time.sleep(0.05)
continue
else:
if not is_homing and servo.value is not None:
servo.detach()
time.sleep(0.1)
# --- 4. WebSocket ハンドラー(スマホの命令をさばく) ---
async def ws_handler(websocket, path=None):
global light_, rl_ang, rl_dir, ud_ang, ud_dir, running, is_homing
try:
async for message in websocket:
data = json.loads(message)
cmd = data.get("cmd")
if cmd == 'get_pos':
pos_str = f"{int(-ud_ang)}/{int(-rl_ang)}"
await websocket.send(json.dumps(pos_str))
elif cmd in ['up', 'down', 'left', 'right', 'stop']:
if cmd == 'up': ud_dir = -1
elif cmd == 'down': ud_dir = 1
elif cmd == 'right': rl_dir = -1
elif cmd == 'left': rl_dir = 1
elif cmd == 'stop': ud_dir = rl_dir = 0
elif cmd == 'light':
light_ ^= 1
ssr.on() if light_ else ssr.off()
elif cmd == 'home':
is_homing = True
rl_ang = ud_ang = rl_dir = ud_dir = 0
servo_ud.angle = 0
servo_rl.angle = 0
time.sleep(1.5)
is_homing = False
pos_str = "0/0"
await websocket.send(json.dumps(pos_str))
except Exception:
pass
# --- 5. メイン実行部(ここが「本番」の順番) ---
if __name__ == '__main__':
# ① まずハードウェアを準備する
factory = PiGPIOFactory()
servo_ud = AngularServo(21, min_angle=-90, max_angle=90, min_pulse_width=0.0005, max_pulse_width=0.0024, pin_factory=factory, initial_angle=0)
servo_rl = AngularServo(20, min_angle=-90, max_angle=90, min_pulse_width=0.0005, max_pulse_width=0.0024, pin_factory=factory, initial_angle=0)
ssr = LED(26)
time.sleep(1.5) # 最初の位置合わせ完了を待つ
# ② 次に「モータースレッド」を裏側で走らせる
# これをサーバー起動(③)の「前」に書くのが重要!
threading.Thread(target=motor_loop, args=(servo_ud, 60), daemon=True).start()
threading.Thread(target=motor_loop, args=(servo_rl, 70), daemon=True).start()
# ③ サーバーの設定と起動
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
base_path = os.path.dirname(os.path.abspath(__file__))
ssl_ctx.load_cert_chain(os.path.join(base_path, 'localhost.pem'), os.path.join(base_path, 'localhost-key.pem'))
async def start_servers():
global httpd
async with websockets.serve(ws_handler, "0.0.0.0", 8700, ssl=ssl_ctx):
# HTTPSサーバーをさらに別スレッドで走らせる
httpd = HTTPServer(('', 443), RestrictedHTTPHandler)
httpd.socket = ssl_ctx.wrap_socket(httpd.socket, server_side=True)
threading.Thread(target=httpd.serve_forever, daemon=True).start()
print("--- System Ready ---")
while running: await asyncio.sleep(1)
# 実行
try:
loop.run_until_complete(start_servers())
except KeyboardInterrupt:
print("\nStopping...")
running = False
if httpd:
stop_thread = threading.Thread(target=httpd.shutdown)
stop_thread.start()
stop_thread.join()
finally:
if httpd: httpd.server_close()
servo_ud.close()
servo_rl.close()
ssr.off()
sys.exit(0)
if __name__ == ‘__main__’:
「このファイルが直接実行された時だけ、この下の処理を動かせ」**という決まり文句
なぜこれが必要なのか?
Pythonのファイルは、2つの使われ方をします。
- 直接実行される:
python inter_02.pyと打ち込んで動かす。 - インポートされる: 他のプログラムから
import inter_02として、中にある関数だけを借りる。
もしこの if 文がないと、他のファイルから関数を借りようとしただけで、HTTPSサーバーやWSSサーバーが勝手に起動してしまいます。
__name__ と '__main__' の正体
Pythonは実行時に、内部で特殊な変数 __name__ を自動で作ります。
- 直接実行したとき:
__name__の中身は'__main__'という文字列になります。 - import されたとき:
__name__の中身は'inter_02'(ファイル名)になります。
# --- 5. メイン実行部(ここが「本番」の順番) ---
if __name__ == '__main__':
# ① まずハードウェアを準備する
..............................................
# ③ サーバーの設定と起動
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) <- SSLの宣言
base_path = os.path.dirname(os.path.abspath(__file__))
ssl_ctx.load_cert_chain(os.path.join(base_path, 'localhost.pem'), os.path.join(base_path, 'localhost-key.pem')) <- SSL Fileの設定
async def start_servers():
global httpd
async with websockets.serve(ws_handler, "0.0.0.0", 8700, ssl=ssl_ctx): <- WSS Serverの宣言
# HTTPSサーバーをさらに別スレッドで走らせる
httpd = HTTPServer(('', 443), RestrictedHTTPHandler) <- HTTPS Serverの宣言
httpd.socket = ssl_ctx.wrap_socket(httpd.socket, server_side=True)
threading.Thread(target=httpd.serve_forever, daemon=True).start() <- HTTPS Serverの宣言
print("--- System Ready ---")
while running: await asyncio.sleep(1)
これ以降このループを実行するのみ。1秒に1回実行される
ー>しかし裏ではHttpsとWssサーバは稼働している。2つのサーバを稼働させる為の空ループ
終わりはrunning=Falseとなった時。これはプログラムで指定している。
# 実行
try:
loop.run_until_complete(start_servers())
start_servers()が終了するまで実行しなさいという関数
except KeyboardInterrupt:
print("\nStopping...")
running = False
if httpd:
stop_thread = threading.Thread(target=httpd.shutdown)
stop_thread.start()
stop_thread.join()
finally:
if httpd: httpd.server_close()
servo_ud.close()
servo_rl.close()
ssr.off()
sys.exit(0)
try: except KeyboardInterrupt: finally:の関係
まずはtry:が実行される
ー>loop.run_until_complete(start_servers())が実行
start_servers()が終了するまで実行しなさいという関数
start_servers()ではWss,Https Serverが起動。
ー>while running: await asyncio.sleep(1) が鍵
except KeyboardInterrupt:
インデントがtry:と同じ位置にある事に注意
runningはCTRL+Cが押された時にFalseとなる。この時初めてstart_servers()が終了する
同じ位置にあるとstart_servers()が終了後CTRL+C受け入れとなりstart_servers()が終了しない
つまりexcept KeyboardInterrupt:はstart_servers()を終了させる事が出来る位置
try:と同じ位置にいる必要がる。
ここでの処理は、async withで処理されないthreadの後処理
finally:
ここで最終処理を行う。
HTTPS Server関係
# --- 2. HTTPハンドラ(ブラウザに画面を出す役目) ---
class RestrictedHTTPHandler(SimpleHTTPRequestHandler):
def do_GET(self):
allowed_extensions = {'.html', '.css', '.js', '.ico'}
_, ext = os.path.splitext(self.path)
if self.path == "/" or ext.lower() in allowed_extensions:
return super().do_GET()
else:
self.send_error(403, "Access Denied.")
os.path.splitext(self.path) の動き
この関数は、パスを「ファイル名」と「拡張子」の2つに分割して、タプルという形式で返します。
例えば self.path が "/intercom.js" だった場合:
- 左側(名前):
"/intercom" - 右側(拡張子):
".js"
コードにある _, ext という書き方は、「左側の名前はいらない(無視する)ので _ に捨てて、右側の拡張子だけを ext という変数に入れる」 という意味です。
if ext.lower() in allowed_extensions:
ext(拡張子の部分)を小文字に変換した文字列がallowed_extensionsに有るかチェック
有る:super().do_GET() (標準のHTTP操作を行う関数)
WebSocketサーバー関係
# --- 4. WebSocket ハンドラー(スマホの命令をさばく) ---
async def ws_handler(websocket, path=None):
global light_, rl_ang, rl_dir, ud_ang, ud_dir, running, is_homing
try:
async for message in websocket:
data = json.loads(message) <ーデータを辞書形式に変換
cmd = data.get("cmd") <ー{"cmd":, "値"} から"cmd"に対する値の取得
if cmd == 'get_pos': <ー位置データを送信
pos_str = f"{int(-ud_ang)}/{int(-rl_ang)}"
await websocket.send(json.dumps(pos_str))
elif cmd in ['up', 'down', 'left', 'right', 'stop']: <ーその他のコマンド処理
if cmd == 'up': ud_dir = -1
elif cmd == 'down': ud_dir = 1
elif cmd == 'right': rl_dir = -1
elif cmd == 'left': rl_dir = 1
elif cmd == 'stop': ud_dir = rl_dir = 0
elif cmd == 'light':
light_ ^= 1
ssr.on() if light_ else ssr.off()
elif cmd == 'home':
is_homing = True
rl_ang = ud_ang = rl_dir = ud_dir = 0
servo_ud.angle = 0
servo_rl.angle = 0
time.sleep(1.5)
is_homing = False
pos_str = "0/0"
await websocket.send(json.dumps(pos_str))
except Exception:
pass
async for message in websocket:et, path=None):
待機: 「スマホからメッセージは届いているかな?」と確認します。
譲渡(重要!): もし届いていなければ、**「届くまでこの処理は一時停止!その間、他の仕事(別のスマホの対応や、サーバーの維持など)をしていいよ!」**と、実行権を他に譲ります。
再開: スマホからデータが届いた瞬間、パッとこの場所に戻ってきて、次の行(json.loads など)を実行します。
data = json.loads(message)
役割:文字列を「辞書形式」に変換する
ブラウザからは {"cmd": "up", "value": 10} のような形式でデータが届きます。これは JSON(ジェイソン) という世界共通のデータ形式ですが、届いた瞬間は単なる「文字の羅列」です。
json.loads(): 「JSONの文字列(Load String)」を読み込んで、Python の 辞書(dict)型 に変換せよ、という命令です。- 結果: これにより、
data["cmd"]のようにキーを指定して値を取り出せるようになります。
cmd = data.get("cmd")
役割:辞書から「特定のデータ」を安全に取り出す
辞書になった data の中から、今回実行したい命令(cmd という名前のデータ)を取り出します。
- なぜ
data["cmd"]ではないのか?: もしブラウザからcmdというキーワードが含まれていない変なデータが届いた場合、data["cmd"]と書くとプログラムはエラーで強制終了してしまいます。 .get("cmd")のメリット:getを使うと、もしcmdが入っていなくてもエラーにならずにNone(空っぽ)を返してくれます。プログラムを突然死させないための**「安全装置」**です。
except Exception: pass
構文の分解
except Exception: 「すべてのエラー(例外)」をキャッチするという意味です。計算ミス(ZeroDivisionError)や、ネットワークの切断、タイポなど、Pythonが投げるほぼすべてのエラーがここにかかります。pass: 「何もしない」という命令です。エラーが起きたとき、本来なら画面に赤い文字でエラーメッセージが出ますが、それを一切出さずにスルーします。
なぜこのコードが使われるのか?inter_02.py のようなハードウェア制御では、以下のような「一瞬の不具合」でプログラム全体を止めたくない場合に使われます。
- 通信のノイズ: WebSocketで一瞬変なデータが届いたが、無視して次のデータを待てばいい時。
- ハードウェアの応答待ち: サーボモータが一瞬ビジーだったが、次のループでリトライすればいい時。
if __name__ == ‘__main__’:
「このファイルが直接実行された時だけ、この下の処理を動かせ」**という決まり文句
なぜこれが必要なのか?
Pythonのファイルは、2つの使われ方をします。
- 直接実行される:
python inter_02.pyと打ち込んで動かす。 - インポートされる: 他のプログラムから
import inter_02として、中にある関数だけを借りる。
もしこの if 文がないと、他のファイルから関数を借りようとしただけで、HTTPSサーバーやWSSサーバーが勝手に起動してしまいます。
__name__ と '__main__' の正体
Pythonは実行時に、内部で特殊な変数 __name__ を自動で作ります。
- 直接実行したとき:
__name__の中身は'__main__'という文字列になります。 - import されたとき:
__name__の中身は'inter_02'(ファイル名)になります。
つまり、if __name__ == '__main__': と書くことで、**「importされただけなら何もしない。直接実行された時だけ、サーバー起動などの重い処理を始めろ」**と指示しているのです。
=====================================================================================
is_ud = (servo == servo_ud)
# if文に書き換えた場合
if servo == servo_ud:
is_ud = True
else:
is_ud = False
cur_dir = ud_dir if is_ud else rl_dir
# if文に書き換えた場合
if is_ud:
cur_dir = ud_dir
else:
cur_dir = rl_dir
<< BACK >>
=====================================================================================
Thread
# 状態管理用: 0=停止, 1=増加(UP/RIGHT), -1=減少(DOWN/LEFT)
move_state_ud = 0
current_angle_ud = 0
def servo_loop():
global current_angle_ud
while True:
if move_state_ud == 1:
if current_angle_ud < 90:
current_angle_ud += 1 # 1度ずつ増やす
servo_ud.angle = current_angle_ud
elif move_state_ud == -1:
if current_angle_ud > -90:
current_angle_ud -= 1 # 1度ずつ減らす
servo_ud.angle = current_angle_ud
time.sleep(0.02) # この数値で回転速度を調整
# サーバー起動前にスレッドを開始
threading.Thread(target=servo_loop, daemon=True).start()
threading.Thread(
target=servo_loop, # 引数1:実行したい関数名(仕事の内容)
daemon=True # 引数2:親が死んだら一緒に死ぬ設定
)
1. daemon=True の場合(おすすめ)
- 動き: メインプログラム(サーバー)が
Ctrl+Cやエラーで終了すると、
サーボ制御スレッドも即座に、自動で終了します。 - メリット: プログラムを止めたのに、裏でモータを制御しようとする
処理だけが生き残って暴走するのを防げます。
2. daemon=False(または指定なし)の場合
- 動き: サーバーを終了させても、スレッドが動いている限りプログラム全体が終了しません。
- デメリット: 「サーバーを止めたはずなのに、なぜかコマンドラインに戻ってこない(裏でスレッドが無限ループし続けている)」という状態になり、手動でプロセスを殺す必要が出てきます。
もし servo_loop(speed) のように、関数に値を渡したい場合は、さらに args という引数を追加します。
# 例:引数(args)を使ってスピードを渡す場合
threading.Thread(target=servo_loop, args=(0.02,), daemon=True).start()
※
argsはタプル形式で渡す決まりなので、引数が1つの場合は(値,)とコンマを入れるのが Python の独特なルールです。
1. スレッドを「止める」方法
Pythonのスレッドには、外から無理やり殺す stop() のような関数はありません。ループの条件式を False にして、**「自分から終わってもらう」**のが唯一の安全な方法です。
Python
# 終了フラグを用意
thread_running = False
def servo_loop():
global current_angle_ud, thread_running
thread_running = True # 動いていることを示す
print("スレッド開始")
while thread_running: # フラグがTrueの間だけループ
if move_state_ud != 0:
# --- 制御処理 ---
# (省略)
pass
time.sleep(0.02)
print("スレッド終了")
# 止めたいときに実行
thread_running = False
2. スレッドを「再開」する方法
一度終了したスレッドオブジェクトは、二度と start() できません。 再開したいときは、**「新しいスレッドオブジェクトをもう一度作って start する」**必要があります。これをボタン操作の do_GET 内で毎回行うと、管理が大変になります。
3. 実務的な「負荷対策」の別解:待機時間を変える
「CPU負荷が気になるが、スレッドの作成・破棄を繰り返すのは面倒」という場合に最もよく使われる手法は、**「動いていない時の sleep 時間を長くする」**という方法です。
def servo_loop():
global current_angle_ud
while True:
if move_state_ud != 0:
# 動いている時は高速にチェック (50Hz)
# (制御処理)
time.sleep(0.02)
else:
# 止まっている時はチェックを遅くする (5Hz)
# これでCPU負荷は「ほぼ無」になります
time.sleep(0.2)
4. それでもスレッドを完全にやめたい場合
「ボタンが押された瞬間だけスレッドを生成し、止まったらスレッドを自滅させる」という構成にします。
def servo_loop():
global current_angle_ud, move_state_ud
# move_state_ud が 0 になるまで動いて、0 になったら関数を抜けて終了する
while move_state_ud != 0:
# (制御処理)
time.sleep(0.02)
print("仕事が終わったのでスレッド自滅します")
# do_GET側
if a == 'up':
if move_state_ud == 0: # まだ動いていなければ
move_state_ud = 1
threading.Thread(target=servo_loop, daemon=True).start()
else:
move_state_ud = 1 # すでに動いていれば方向だけ変える
elif a == 'stop':
move_state_ud = 0 # これで while ループが終わり、スレッドが消える
=====================================================================================
文字列を特定の文字(区切り文字)で分割
1. 基本的な分割 (split)
text = "apple,orange,banana"
fruits = text.split(",")
print(fruits)
# 出力: ['apple', 'orange', 'banana']
2. 空白で分割する場合
引数を指定せずに split() を使用。スペース、タブ、改行などの連続する空白文字をまとめて区切りとして扱います。
text = "Python Java Ruby"
languages = text.split()
print(languages)
# 出力: ['Python', 'Java', 'Ruby']
3. 分割回数を制限する (maxsplit)
第2引数に数値を渡すと、分割する回数を指定できます。データの「最初の1つだけ切り出したい」時に便利です。
text = "2023/10/27/entry/123"
data = text.split("/", 2)
print(data)
# 出力: ['2023', '10', '27/entry/123'] (2回だけ分割される)
4. 複数の異なる文字で分割したい場合
, や / など、複数の区切り文字が混在している場合は、標準ライブラリの re モジュール(正規表現) を使います。
import re
text = "apple,orange;banana blue"
# カンマ(,) セミコロン(;) スペース( ) のいずれかで分割
fruits = re.split(r'[,; ]', text)
print(fruits)
# 出力: ['apple', 'orange', 'banana', 'blue']
まとめ
やりたいこと 使うメソッド 特徴
特定の1文字で分ける text.split(',') 最も標準的
空白・改行で分ける text.split() 連続した空白も1つとして扱う
後ろから分ける text.rsplit(',', 1) 右側から指定回数分だけ分ける
複数種類の文字で分ける re.split(r'[; /]', text) 正規表現で柔軟に指定可能
=====================================================================================
演算子
演算子 意味 例 (a=10, b=3) 結果
+ 加算 a + b 13
- 減算 a - b 7
* 乗算 a * b 30
/ 除算 a / b 3.333...
// 切り捨て除算 a // b 3
% 剰余(あまり)a % b 1
** べき乗 a ** b 1000
== 等しい a == b
!= 等しくない a != b
> より大きい a > b
< より小さい a < b
>= 以上 a >= b
<= 以下 a <= b
= x = 5 x に 5 を代入
+= x += 3 x = x + 3
-= x -= 2 x = x - 2
*= x *= 4 x = x * 4
/= x /= 2 x = x / 2
and 論理積 すべての条件が True なら True
if score >= 80 and attendance >= 80:
or 論理和 どれか一つでも True なら True
if is_weekend or is_holiday:
not 論理否定 真理値を反転させる(True → False)
if not is_error:
is_error = False / is_error = not is_error -> is_errorはTrue
=====================================================================================
ファイルの読み書き
読み
全文を一度に読み込む
with open('log.txt', 'r', encoding='utf-8') as f:
data = f.read()
print(data)
1行ずつリストとして読み込む
f.readlines():一度に全部読み込みline.strip()で分割
f.readline()なら1行のみ読み込み
with open('config.txt', 'r', encoding='utf-8') as f:
lines = f.readlines()
for line in lines:
print(line.strip()) # strip()で改行文字を除去
末尾に改行コード(\n)を取り除く
1. .rstrip() を使う
.rstrip():「右側の文字を取り除く」。引数に何も指定しない場合、末尾の改行やスペースをすべて削除します。
with open("sample.txt", "r") as f:
line = f.readline().rstrip("\n") # 改行コードだけを確実に消す
print(line)
2. .strip() を使う(前後の空白も消す場合)
改行だけでなく、行の前後にある不要なスペースも一緒に消したい場合に便利です。
line = f.readline().strip()
3. スライスを使う
最後の1文字を削ることもできます。
line = f.readline()[:-1]
Fileの終わり
空(EOF)が返って来る。
ファイルポインタが末尾(EOF)に達したことを if 文で判断するには、**「読み取った結果が空(長さ0)であるか」**をチェックします。
もっとも一般的な判定方法です。Pythonでは空文字は「偽(False)」と判定されるため、if not で簡潔に書けます。
line = f.readline()
if not line:
# 読み込んだ結果が '' (空文字) だった場合 = EOF
print("ファイルの終わりに達しました")
書き
中身をすべて上書き
with open('status.txt', 'w', encoding='utf-8') as f:
f.write("REC_START")
追記
import datetime
# 'a' は Append(追記)の略
with open('log.txt', 'a', encoding='utf-8') as f:
now = datetime.datetime.now()
f.write(f"{now}: 録画を開始しました\n") # \n は改行
リストをまとめて書き込む
lines = ["apple\n", "banana\n", "cherry\n"]
with open('list.txt', 'w', encoding='utf-8') as f:
f.writelines(lines)
よく使うオプション(モード)
open() の第2引数で指定します。
'r': 読み込み(デフォルト)。ファイルがないとエラー。'w': 書き込み。既存の内容は消去されます。'a': 追記。既存の内容の末尾に書き足します。'rb': バイナリ読み込み(画像や動画ファイルを扱うとき)。'wb': バイナリ書き込み(画像や動画ファイルを扱うとき)。'ab': バイナリ追記。既存の内容の末尾に書き足します。ストリーミングの記録等'xb': 誤上書きの防止。同じ名前のファイルが既にあったらエラーにして上書きを阻止- FileExistsError という「例外(Exception)」を発生
try:
with open(‘important_data.bin’, ‘xb’) as f:
f.write(data)
except FileExistsError:
print(“エラー:同名のファイルが既に存在します!”)
- FileExistsError という「例外(Exception)」を発生
注意点
- 文字コード: Windowsで作ったファイルをラズパイ(Linux)で読む場合などは、
encoding='utf-8'を明示的に指定しないと文字化けすることがあります。 - パス: ファイルが実行中のPythonファイルと同じフォルダにない場合は、フルパス(例:
/home/pi/data.txt)を指定してください。 f.write()に渡せるのは 文字列(String)だけ です。数値を書き込みたい場合は、必ずstr()で変換するか、f-string(f"{num}")を使ってください- NG:
f.write(123)OK:f.write(str(123))
- NG:
バイナリモードの重要なルール
encodingを指定してはいけないopen('file', 'wb', encoding='utf-8')と書くと、Pythonは「バイナリ(数値の羅列)なのに文字コードがあるのはおかしい」と判断し、エラー(ValueError)を投げます。- 書き込むデータは
bytes型であること 文字列(str型)をそのままf.write()することはできません。文字列をバイナリとして書きたい場合は.encode()が必須です。- # 成功例
f.write(b’\x00\x01\x02′) # byte列
f.write(“text”.encode()) # bytesに変換されたデータ
# 失敗例
f.write(“text”)
TypeError: a bytes-like object is required, not ‘str’
- # 成功例
=====================================================================================
GPIO制御
import RPi.GPIO as GPIO
# ピン番号の数え方を指定(BOARD: 基板の端子順 / BCM: チップの番号)
GPIO.setmode(GPIO.BCM)
# 使用するピン(例:18番ピン)を出力モードに設定
GPIO.setup(18, GPIO.OUT)
# 出力をONにする(3.3V出力)
GPIO.output(18, GPIO.HIGH)
# 24番ピンを入力に設定。内部プルアップを有効にする。
GPIO.setup(24, GPIO.IN, pull_up_down=GPIO.PUD_UP)
# ピンの状態を読み込む
input_state = GPIO.input(24)
# 最後に必ずお掃除(これを忘れると次に使うときに警告が出ます)
GPIO.cleanup()
=====================================================================================
複数行の文字列定義
3つのダブルまたはシングルコーテーション ”””…””” or ”’…”’ を使用
multiline_string = """
This is a string that
spans multiple lines.
Newlines and indentation are preserved.
"""
print(multiline_string)
This is a string that
spans multiple lines.
Newlines and indentation are preserved.
\(バックスラッシュ)を使用
single_line_string = "This is a long string that is split across " \
"multiple lines in the source code."
print(single_line_string)
This is a long string that is split across multiple lines in the source code.
# To include newlines in the output:
string_with_breaks = "Line 1\n" \
"Line 2\n" \
"Line 3"
print(string_with_breaks)
Line 1
Line 2
Line 3
()を使用
concatenated_string = ("This text will be joined into a single line. "
"Spaces and newlines in the source code "
"between the quotes are ignored.")
print(concatenated_string)
This text will be joined into a single line. Spaces and newlines in the source code between the quotes are ignored.
=====================================================================================
get()
辞書から値を取得する
user = {"name": "John", "age": 30}
name = user.get("name")
name
"John"
無いキーを指定 -> Noneが帰って来る Errorでは無い
email = user.get("email")
print(email)
None
=====================================================================================
os.path.splitext()
path_stringを要素に分割
import os
path='/path/to/file.jpg'
root, extension = os.path.splitext(path)
print(root)
/path/to/file
print(extension)
.jpg
path_array = os.path.splitext(path)
path_array
('/path/to/file', '.jpg')
=====================================================================================
urlparse()
urlparse(): URL を部分文字列に分解するための機能
from urllib.parse import urlparse
url = "http://example.com/path/to/file.jpg?param=1"
parsed = urlparse(url)
print(parsed)
ParseResult(scheme='http', netloc='example.com', path='/path/to/file.jpg', params='', query='param=1', fragment='')
print(parsed.path)
/path/to/file.jpg
=====================================================================================
parse_qs(), iter(), next()
parse_qs(): クエリ文字列(パラメータ)を辞書・リストに変換
iter() : 要素を一つずつ取り出す
next(): 次の要素を取得
from urllib.parse import parse_qs
qs = 'key1=value1&key2=value2%201&key2=value2%2F2'
qs_d = parse_qs(qs)
print(qs_d)
{'key1': ['value1'], 'key2': ['value2 1', 'value2/2']}
qs_d['key2']
['value2 1', 'value2/2']
next(iter(qs_d))
'key1'
next(iter(qs_d))
'key1'
mm = iter(qs_d)
next(mm)
'key1'
next(mm)
'key2'
=====================================================================================
self.path.endswith
HTTP サーバーでリクエストされた URL のパスが特定の拡張子で終わっているかどうかを確認する
from http.server import BaseHTTPRequestHandler, HTTPServer
class MyRequestHandler(BaseHTTPRequestHandler):
def do_GET(self):
# リクエストされたパスが '.css' で終わる場合...
if self.path.endswith('.css'):
# CSSファイルを処理するロジック
self.send_response(200)
self.send_header("Content-type", "text/css")
self.end_headers()
# ... ファイルを読み込んでクライアントに送信 ...
else:
# それ以外の場合(例: HTMLページなど)を処理するロジック
# ...
=====================================================================================
for
リスト,タプル
fruits = ['apple', 'banana', 'orange']
for fruit in fruits:
print(fruit)
apple
banana
orange
辞書
scores = {'English': 90, 'Math': 80, 'Science': 70}
for subject, score in scores.items():
print(f'{subject}: {score}')
English: 90
Math: 80
Science: 70
range関数
range(1, 10, 2): 1から9までの範囲の整数を2ずつ増加
for i in range(1, 10, 2):
print(i)
1
3
5
7
9
breakとcontinue
breakは、任意の部分でループから抜け出す
for i in range(5):
if i == 3:
break
print(i)
0
1
2
contiuneは残りの処理をスキップし、次のループを処理
for i in range(5):
if i == 3:
continue
print(i)
0
1
2
4
=====================================================================================
文字列を抽出
半角文字も全角文字も同じ1文字としてカウントされる。
インデックス
[]にインデックスを指定。インデックスは1文字目が0。
s = 'abcde'
print(s[0])
# a
print(s[4])
# e
負の値:後ろからの位置指定。-1が最後尾。
print(s[-1])
# e
print(s[-5])
# a
スライス
[start:stop] → start <= x < stopの範囲の文字列を抽出。
startを省略すると先頭から、stopを省略すると末尾までの範囲となる
s = 'abcde'
print(s[1:3])
# bc
print(s[:3])
# abc
print(s[1:])
# bcde
負の値も使える。
print(s[-4:-2])
# bc
print(s[:-2])
# abc
print(s[-4:])
# bcde
[start:stop:step] → 開始位置startと終了位置stopに加えて、増分step。
stepに負の値を指定すると後ろから順番に抽出される。
print(s[1:4:2])
# bd
print(s[::2])
# ace
print(s[::3])
# ad
print(s[::-1])
# edcba
print(s[::-2])
# eca
=====================================================================================
f文字列
Python 3.6から導入された強力な機能。文字列内に直接Python式を埋め込みコードの可読性と効率を大幅に向上。
変数の値を文字列に挿入
name = "太郎"
age = 30
print(f"私の名前は{name}で、{age}歳です。")
実行結果:
私の名前は太郎で、30歳です。
式の評価
x = 10
y = 20
print(f"{x} + {y} = {x + y}")
実行結果:
10 + 20 = 30
メソッドの呼び出し
name = "python"
print(f"大文字: {name.upper()}")
実行結果:
大文字: PYTHON
辞書のアクセス
person = {"name": "花子", "age": 25}
print(f"{person['name']}さんは{person['age']}歳です。")
実行結果:
花子さんは25歳です
条件式(三項演算子)
x = 15
print(f"{x}は{'偶数' if x % 2 == 0 else '奇数'}です。")
実行結果:
15は奇数です。
フォーマット指定子の使用
pi = 3.14159
print(f"円周率は{pi:.2f}です。")
実行結果:
円周率は3.14です
左寄せ、中央寄せ、右寄せ
name = "Python"
print(f"{name:<10}|") # 左寄せ
print(f"{name:^10}|") # 中央寄せ
print(f"{name:>10}|") # 右寄せ
実行結果:
Python |
Python |
Python|
日付のフォーマット
from datetime import datetime
now = datetime.now()
print(f"現在時刻: {now:%Y-%m-%d %H:%M:%S}")
実行結果(実行時の日時により異なります):
現在時刻: 2024-07-28 12:34:56
2進数、8進数、16進数表現
num = 42
print(f"10進数: {num}")
print(f"2進数: {num:b}")
print(f"8進数: {num:o}")
print(f"16進数: {num:x}")
実行結果:
10進数: 42
2進数: 101010
8進数: 52
16進数: 2a
デバッグ用の=指定子
x = 10
y = 20
print(f"{x=}, {y=}")
実行結果:
x=10, y=20
=====================================================================================
時刻取得
- 基本的な日時取得 →
datetime.now() - 時刻の計算や変換を楽にしたい →
arrow - タイムゾーンを考慮したい →
pytz - 処理時間計測 →
timeit.default_timer()ortime.perf_counter() - UNIXタイムが必要 →
time.time()
| 用途 | 適したライブラリ |
|---|---|
| 現在の日時を取得 (ローカルタイム) | datetime.datetime.now() |
| 現在の時刻をエポック秒で取得 | time.time() |
| 時刻を特定のフォーマットで表示 | time.localtime() + time.strftime() |
| タイムゾーンを考慮した時刻取得 | datetime.now(pytz.timezone(…)) or arrow.now() |
| WebAPIや時刻処理を簡単に扱いたい | arrow.now() |
| 時間計測 (コードの実行時間) | timeit.default_timer() or time.perf_counter() |
1. datetime.datetime.now()
用途・シチュエーション:
- 一般的な日時取得: 日付や時刻を扱うシンプルな処理に最適.
- フォーマットの自由度:
.strftime()を使ってフォーマットを変更しやすい. - ローカル時刻の取得:
datetime.now()はデフォルトでシステムのローカルタイムを取得する.
注意点:
- タイムゾーン情報は含まれないため,グローバルな処理には
pytzとの併用が必要.
from datetime import datetime
# 現在の日時を取得
time_datetime= datetime.now()
print(time_datetime)
# 2025-03-20 16:25:35.654095
2. time.time()
用途・シチュエーション:
- エポック秒(Unix時間)の取得: 1970年1月1日からの経過秒数で時刻を扱いたいときに最適.
- 時間差の計測: 2つの
time.time()の差を取ることで処理時間を計測できる.
注意点:
- 可読性が低いため,直接ユーザー向けに表示する用途には適さない.
import time
# 現在の時刻(エポック秒)を取得
time_time = time.time()
print(time_time)
print(f"{time_time:.3f}") # 小数点以下3桁まで表示
# 1742455535.6543741
# 1742455535.654
3. time.localtime() + time.strftime()
用途・シチュエーション:
- ローカル時刻を文字列として扱う:
strftime()によりフォーマットを簡単に変更できる. - ログ出力やUI表示:
YYYY-MM-DD HH:MM:SSのような形に整えやすい.
注意点:
datetimeに比べると直感的でない場合がある.
import time
# 現在の時刻をフォーマットして表示
local_time = time.localtime()
formatted_time = time.strftime("%Y-%m-%d %H:%M:%S", local_time)
print(formatted_time)
# 2025-03-20 16:25:35
4. datetime.now(pytz.timezone("Asia/Tokyo"))
用途・シチュエーション:
- タイムゾーンを明示的に指定する必要がある場合: タイムゾーンが重要なアプリケーション(例: グローバル対応のWebサービス).
- 異なるタイムゾーン間での時刻変換:
pytz.timezone("UTC").localize(dt).astimezone(pytz.timezone("Asia/Tokyo"))などが可能.
注意点:
pytzは標準ライブラリではないため,pip install pytzが必要.
from datetime import datetime
import pytz
# タイムゾーンを指定して現在時刻を取得
time_pytz = pytz.timezone("Asia/Tokyo")
tokyo_time = datetime.now(time_pytz)
print(tokyo_time)
# 2025-03-20 16:25:35.943172+09:00
5. arrow.now()
用途・シチュエーション:
datetimeより直感的な時刻操作:.shift(days=1)で簡単に1日後の時刻を取得可能.- タイムゾーンを簡単に扱える:
.to('Asia/Tokyo')で変換できる. - WebアプリやAPIでの時刻処理: ISO 8601 形式 (
YYYY-MM-DDTHH:MM:SS+TZ) の出力が容易.
注意点:
arrowは標準ライブラリではなく,pip install arrowが必要.
import arrow
# 現在の時刻を取得
time_arrow= arrow.now()
print(time_arrow)
# 2025-03-20T16:25:36.045942+09:00
6. timeit.default_timer()
用途・シチュエーション:
- コードの実行時間計測:
start = timeit.default_timer(); do_something(); elapsed = timeit.default_timer() - start - 高精度な時間測定:
time.time()より精度が高い(環境によって異なる).
注意点:
datetimeやtimeとは異なり,時刻取得の用途には適さない.timeit.default_timer()は Windows ではtime.perf_counter(),Linux/macOS ではtime.monotonic()を内部的に使用する為,環境ごとに挙動が異なることがある.
import timeit
# 現在の時刻をエポック秒で取得
timestamp = timeit.default_timer()
print(timestamp)
# 15251.0218552
7. time.perf_counter()
用途・シチュエーション:
- 高精度な時間計測:
timeit.default_timer()同様,処理時間の測定に適している. - システムの時計と無関係に動作: システムの時刻とは無関係に動作する為,プロセスの実行時間の測定等に適しており,精度が高い.
注意点:
- 絶対的な時刻を取得するのではなく,相対時間の測定向け.
import time
# 高精度な時刻取得
current_time = time.perf_counter()
print(current_time)
# 15251.0220795
=====================================================================================
subprocess
外部処理をサブプロセスで行う。
subprocess.run()
- subprocess.run([“ls”])
- コマンドはリストで指定。
- シェルで実行するようなコマンドが実行できるが出力を取得出来ない。
- subprocess.run(“ls”,shell =True)
- パイプ(
|)、ワイルドカード(*)、環境変数展開、埋め込みコマンド(dir,cdなど)シェル機能が利用可能 - コマンドはスペース区切りの文字列で渡します
- パイプ(
- 戻り値
- proc = subprocess.run([“ls”],stdout = subprocess.PIPE, stderr = subprocess.PIPE)
- print(proc.stdout.decode(“utf8”)) // python側で出力を取る。
- cp = subprocess.run([‘ls’, ‘-a’])
- cp.returncode -> OK:1 NG: Other
- proc = subprocess.run([“ls”],stdout = subprocess.PIPE, stderr = subprocess.PIPE)
subprocess.Popen()
プロセスは並行に実行.起動したものが終了しているかどうかに関わらず,Popenの後に記述されたコードは実行
されます.
| 処理名 | 概要 |
|---|---|
| Popen() | プロセスの初期化、実行 |
| Popen.wait() | プロセスの終了を待機 プロセス終了後、終了コード(リターンコード)を返す |
| Popen.poll() | 非同期的にプロセスの出力を取得 |
| Popen.communicate() | 標準入力を渡し、標準出力と標準エラーを取得する |
| (stdout, stderr) | プロセスの標準出力や標準エラーを取得 |
Popen.wait()
# 'sleep 2' コマンドを実行するプロセスを起動
process = subprocess.Popen(['sleep', '2'])
print("プロセスを開始しました。")
# プロセスが終了するまで待機
return_code = process.wait()
print(f"プロセスが終了しました。終了コード: {return_code}")
Popen.poll()
import subprocess
import time
# プロセスを開始 (終了を待たない)
process = subprocess.Popen(['sleep', '5']) # 例: 5秒待機するコマンド
print("プロセスを開始しました。")
# プロセスが終了するまでポーリングしつつ、他の処理を行う
while True:
return_code = process.poll() # プロセスの状態をチェック
if return_code is not None:
print(f"プロセスが終了しました。終了コード: {return_code}")
break
else:
print("プロセスはまだ実行中です...")
time.sleep(1) # 1秒待ってから再度チェック
Popen.communicate()
import subprocess
import time
process = subprocess.Popen(
["grep", ".*\.txt"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
INPUT = "file_0.txt\nfile_1.txt\nfile_2.txt\nimg_1.jpg\nimg_2.jpg\nimg_3.jpg\n"
stdout, stderr = process.communicate(input=INPUT)
print("STDOUT:", stdout)
print("STDERR:", stderr)
=====================================================================================
リスト、タプル、辞書、集合
リスト [ ]
要素をカンマで区切って全体を大カッコ[]で囲む。
変更可。要素の挿入と削除を行うことができる。インデックス(番号)で要素にアクセス(読み書き)出来る。
List = ["A", "B", "C", "D", "E"]
print(List)
# ⇒ ['A', 'B', 'C', 'D', 'E']
List = ["A", "B", "C", "D", "E"]
print(List[2])
# ⇒ C
List = ["A", "B", "C", "D", "E"]
List[2]="Y"
List
# ⇒ ['A', 'B', 'Y', 'D', 'E']
タプル()
要素をカンマで区切って全体をカッコ()で囲む。->カッコ()無しでもOK
変更不可。インデックス(番号)で要素にアクセス(読み)出来る。
Tuple = ("A", "B", "C", "D", "E")
print(Tuple)
# ⇒ ('A', 'B', 'C', 'D', 'E')
Tuple = ("A", "B", "C", "D", "E")
print(Tuple[2])
# ⇒ C
Tuple = ("A", "B", "C", "D", "E")
Tuple[2]="Y"
Tuple
# ⇒ Error
# TypeError: 'tuple' object does not support item assignment
辞書 { }
key : valuのペアをカンマ区切って全体を中カッコ{ }で囲む。
変更可。リストに似ていますが要素へのアクセスはキーで行う。
Dictionary = {"USA":1, "JAPAN":2, "Germany":3}
Dictionary
# ⇒ {'USA': 1, 'JAPAN': 2, 'Germany': 3}
Dictionary = {"USA":1, "JAPAN":2, "Germany":3}
Dictionary["JAPAN"]
# ⇒ 2
集合 { }
set()関数を使うか、1個以上のカンマ区切りの値を波括弧で囲む。同じ要素を一つしか持てない。
変更可。要素へのアクセスはキーで行う。
SET = set(["USA","JAPAN","Germany","JAPAN"])
SET
# ⇒ {'Germany', 'JAPAN', 'USA'}
atomic = {'H','C','O','N'}
atomic
# ⇒ {'C', 'H', 'N', 'O'}
set("philadelphia")
# ⇒ {'a', 'd', 'e', 'h', 'i', 'l', 'p'}
リストから集合を作る
US_list = ["Philadelphia","New York","LA","Boston","New York"]
set(US_list)
# ⇒ {'Boston', 'LA', 'New York', 'Philadelphia'}
タプルから集合を作る
US_list2 = ("Philadelphia","New York","LA","Boston","New York")
set(US_list2)
# ⇒ {'Boston', 'LA', 'New York', 'Philadelphia'}
辞書をset()で囲むとキーだけが使われる
atomic_weight = {'H': 1.008, 'C': 12.01, 'O': 16, 'N': 14.01}
set(atomic_weight)
# ⇒ {'C', 'H', 'N', 'O'}
=====================================================================================
open()
open(file, mode=’r’, buffering=-1, encoding=None, errors=None, newline=None, closefd=True, opener=None)
- mode
文字 意味 'r'読み込み用に開く (デフォルト) 'w'書き込み用に開き、まずファイルを切り詰める 'x'排他的な生成に開き、ファイルが存在する場合は失敗する 'a'書き込み用に開き、ファイルが存在する場合には末尾に追記する 'b'バイナリモード 't'テキストモード (デフォルト) '+'更新用に開く (読み込み・書き込み用)
- buffering
- 0: バッファリングを無効化 (バイナリモードでのみ設定可能です)
- 1: 行単位でのバッファリング (テキストモードでの書き込み時のみ有効です)
- >1: 固定サイズのチャンクバッファに対するサイズをバイト単位で指定
- encoding
- テキストモードでのみ使用
- Pythonでサポートされているエンコーディングはどれでも使えます。
- errors
- エンコードやデコードでのエラーをどのように扱うかを指定
- バイナリモードでは使用できません
ファイル全体を文字列として読み込み: read()
with open(path) as f:
s = f.read()
print(s)
# line 1
# line 2
# line 3
ファイル全体をリストとして読み込み: readlines()
with open(path) as f:
l = f.readlines()
print(l)
# [‘line 1\n’, ‘line 2\n’, ‘line 3’]
ファイルを一行ずつ読み込み: readline()
with open(path) as f:
for s_line in f:
print(repr(s_line))
# ‘line 1\n’
# ‘line 2\n’
# ‘line 3’
文字列を書き込み: write()
path_w = ‘data/temp/test_w.txt’
s = ‘New file’
with open(path_w, mode=‘w’) as f:
f.write(s)
with open(path_w) as f:
print(f.read())
# New file
リストを書き込み: writelines()
l = [‘One’, ‘Two’, ‘Three’]
with open(path_w, mode=‘w’) as f:
f.writelines(l)
with open(path_w) as f:
print(f.read())
# OneTwoThree
リストの要素ごとに改行して書き込む(+演算子、joinなど)
with open(path_w, mode=‘w’) as f:
f.write(‘\n’.join(l))
with open(path_w) as f:
print(f.read())
# One
# Two
# Three
=====================================================================================