Server Hooks
on_starting
Default:
def on_starting(server):
pass
2022年1月5日大约 2 分钟
on_starting
Default:
def on_starting(server):
pass
import sys
import re
import os
import jinja2
type2type ={
"string": "str",
"int32" : "int",
"float": "float",
"double": "float",
"int64": "int",
"uint32": "int",
"uint64": "int",
"sint32": "int",
"sint64": "int",
"fixed32": "int",
"fixed64": "int",
"sfixed32": "int",
"sfixed64": "int",
"bool": "bool",
"bytes": "bytes"
}
base_type_set = set(type2type.values())
already_class = set()
imp_str = """#!/usr/bin/python3
# -*- coding:utf-8 -*-
import typing
from google.protobuf.message import Message
"""
message_tmplate='''
class {{name}}(Message):{% for col in cols %}
{{col.name}} {{col.typing}}{% endfor %}
'''
class Field:
def __init__(self, n="", t="") -> None:
self.name :str = n
self.typing :str = t
@classmethod
def parse_from_str(cls, field_str):
fields = re.findall(r"^\s*(\S+)\s+(\S+)\s*=\s*\d+\s*;",
field_str, flags=re.MULTILINE)
for f in fields:
t = type2type.get(f[0], f[0])
return cls(f[1], f": {t}")
fields = re.findall(r"^\s*optional\s+(\S+)\s+(\S+)\s*=\s*\d+\s*;",
field_str, flags=re.MULTILINE)
for f in fields:
t = type2type.get(f[0], f[0])
return cls(f[1], f": {t}")
fields = re.findall(r"^\s*repeated\s+(\S+)\s+(\S+)\s*=\s*\d+\s*;",
field_str, flags=re.MULTILINE)
for f in fields:
t = type2type.get(f[0], f[0])
return cls(f[1], f": typing.List [{t}]")
fields = re.findall(r"^\s*map<\s*(\S+)\s*,\s*(\S+)\s*>\s*(\S+)\s*=\s*\d+\s*;",
field_str, flags=re.MULTILINE)
for f in fields:
k = type2type.get(f[0], f[0])
v = type2type.get(f[1], f[1])
return cls(f[2], f": typing.Dict[{k}, {v}]")
class Message:
def __init__(self, n="", f=list()) -> None:
self.name : str = n
self.fiels : list[Field] = f
@classmethod
def parse_from_str(cls, msg_str):
name = re.findall(r"\s*message\s+(\S+)\s+{.*?}", msg_str, flags=re.DOTALL)
if name:
name = name[0]
fields = list()
results = re.findall(r"^\s*\S+.*?\s*=\s*\d+\s*;", msg_str, flags=re.MULTILINE)
for result in results:
fields.append(Field.parse_from_str(result))
return cls(name, fields)
def code_render(self):
template = jinja2.Template(message_tmplate)
return template.render(name=self.name, cols=self.fiels)
class Proto:
def __init__(self, n: str = "", m: list = list()) -> None:
self.imp :str = imp_str
self.name :str = n
self.messages :list[Message] = m
@classmethod
def parse_from_file(cls, name, *args, **kwargs):
proto_file = f"{name}.proto"
if not os.path.exists(proto_file):
raise Exception(f"{proto_file} not exist")
messages = list()
with open(proto_file, mode="r", encoding="utf-8") as fd:
proto_str = fd.read()
msgs = re.findall(r"\s*(message\s+(\S+)\s+{.*?})", proto_str, flags=re.DOTALL)
for msg in msgs:
already_class.add(msg[1])
for msg in msgs:
messages.append(Message.parse_from_str(msg[0]))
return cls(name, messages)
def render_to_file(self):
texts = [self.imp,]
for msg in self.messages:
texts.append(msg.code_render())
with open(f"{self.name}_pb2.pyi", mode="w", encoding="utf-8") as fd:
fd.write("\n".join(texts))
os.system(f"protoc --python_out=. {self.name}.proto")
if __name__ == "__main__":
proto = Proto.parse_from_file(*sys.argv[1:])
proto.render_to_file()
from gevent import spawn,joinall,monkey;monkey.patch_all()
import time
def task(pid):
"""
Some non-deterministic task
"""
time.sleep(2)
print('Task %s done' % pid)
def synchronous():
for i in range(3):
task(i)
def asynchronous():
g_l=[spawn(task,i) for i in range(3)]
joinall(g_l)
if __name__ == '__main__':
print('Synchronous:')
synchronous()
print('Asynchronous:')
asynchronous()
import posix_ipc
import selectors
# This program uses `posix_ipc` together with the `selectors`library from the
# Python standard library. `selectors` provides "high-level I/O multiplexing" akin to having an event library.
# The message queue is created as usual
mq = posix_ipc.MessageQueue("/python_ipc_test", flags=posix_ipc.O_CREAT,
max_messages=1024, max_message_size=1024 * 1024)
mq.block = False
# Function is defined to handle events on the queue
counter = 0
def accept(message_queue, mask):
global counter
(msg, prio) = message_queue.receive()
counter += 1
print(f" \r{len(msg)}, Priority:{prio},{counter}", end="")
# print("Message: ", msg)
# print("Priority: ", prio)
# The selector can now be created...
sel = selectors.DefaultSelector()
# ... and the message queue is registered. Other event sources could also be
# registered simultaneously, but for now we stick to the queue
sel.register(mq, selectors.EVENT_READ, accept)
# `.select()` will block until an event is triggered
while True:
try:
events = sel.select()
for key, mask in events:
# `.data` contains the third argument from `.register` above -- we use it for the callback.
callback = key.data
callback(key.fileobj, mask)
except Exception as e:
import traceback
print(traceback.format_exc())
break
finally:
""
# With the message successfully received, we can unlink and close.
mq.unlink()
mq.close()
用 top 的方式分析 Python 程序性能的工具。一款 Python 程序性能分析工具,它可以让你在不重启程序或修改代码的情况,直观地看到 Python 程序中每个函数花费的时间。
# 安装
pip install py-spy
# record 命令将配置文件记录到文件中,可用来生成火焰图
py-spy record -o profile.svg --pid 进程ID
# top 命令实时展示函数花费时间
py-spy top --pid 进程ID
# dump 命令显示每个 Python 线程的当前调用堆栈
py-spy dump --pid 进程ID
打开系统配置添加如下配置
sudo vim /etc/sysctl.conf
kernel.msgmax=1048576
kernel.msgmnb=1048576
单例模式很常见,实现有很多地方值得优化,具体的优化历程,此处不多做赘述。
谨在此记录一个本人认为完善的,单例实现。具体实现思路使用Python实现。
其他语言也可以借鉴思路。
import time
import threading
class Singleton(object):
_instance_lock = threading.Lock()
def __init__(self):
time.sleep(1)
@classmethod
def instance(cls, *args, **kwargs):
if not hasattr(Singleton, "_instance"):
with Singleton._instance_lock:
if not hasattr(Singleton, "_instance"):
Singleton._instance = Singleton(*args, **kwargs)
return Singleton._instance
这是一个使用python3开发的windows环境下,读取串口数据的桌面小工具。
python3有现成的第三方库,灵活方便,以上功能要求都能满足。
最后使用pyinstaller发布。
同时具备完备的日志记录和分析功能。
涉及Excel写、日志、用户剪切板操作、COM串口读写、GUI界面开发、发布等。
安装python3-devel
sudo apt -y install python38-devel
配置C语言includepath
环境变量
在 bashrc
或 bash_profile中添加环境变量,并使之生效
编写C代码,新建文件 spammoudle.c
#define PY_SSIZE_T_CLEAN
#include <Python.h>
#include <stdlib.h>
static PyObject *
spam_system(PyObject *self, PyObject *args)
{
const char *command;
int sts;
if (!PyArg_ParseTuple(args, "s", &command))
return NULL;
sts = system(command);
return PyLong_FromLong(sts);
}
static PyObject *SpamError;
static PyMethodDef SpamMethods[] = {
{"system", spam_system, METH_VARARGS,
"Execute a shell command."},
{NULL, NULL, 0, NULL} /* Sentinel */
};
static struct PyModuleDef spammodule = {
PyModuleDef_HEAD_INIT,
"spam", /* name of module */
"test module C", /* module documentation, may be NULL */
-1, /* size of per-interpreter state of the module,
or -1 if the module keeps state in global variables. */
SpamMethods
};
PyMODINIT_FUNC
PyInit_spam(void)
{
PyObject *m;
m = PyModule_Create(&spammodule);
if (m == NULL)
return NULL;
SpamError = PyErr_NewException("spam.error", NULL, NULL);
Py_XINCREF(SpamError);
if (PyModule_AddObject(m, "error", SpamError) < 0) {
Py_XDECREF(SpamError);
Py_CLEAR(SpamError);
Py_DECREF(m);
return NULL;
}
return m;
}
int
main(int argc, char *argv[])
{
wchar_t *program = Py_DecodeLocale(argv[0], NULL);
if (program == NULL) {
fprintf(stderr, "Fatal error: cannot decode argv[0]\n");
exit(1);
}
/* Add a built-in module, before Py_Initialize */
if (PyImport_AppendInittab("spam", PyInit_spam) == -1) {
fprintf(stderr, "Error: could not extend in-built modules table\n");
exit(1);
}
/* Pass argv[0] to the Python interpreter */
Py_SetProgramName(program);
/* Initialize the Python interpreter. Required.
If this step fails, it will be a fatal error. */
Py_Initialize();
/* Optionally import the module; alternatively,
import can be deferred until the embedded script
imports it. */
PyObject *pmodule = PyImport_ImportModule("spam");
if (!pmodule) {
PyErr_Print();
fprintf(stderr, "Error: could not import module 'spam'\n");
}
PyMem_RawFree(program);
return 0;
}
#!/usr/bin/python3
# -*- coding:utf-8 -*-
import IPy
import ipaddress
import logging
import traceback
logger = logging.getLogger()
trace_logger = logging.getLogger()
def merge_range_net(nets: list):
out_nets = list()
length = len(nets)
if length == 0:
return out_nets
i = 0
while i < length:
re_loop = False
net = nets[i]
if net is None:
i += 1
continue
for ii in range(i+1, length):
net1 = nets[ii]
if net1 is None:
continue
# 刚好拼接merge
if net[1] == net1[0]-1:
nets[ii] = None
nets[i] = [net[0], net1[1]]
re_loop = True
break
if net[0] - 1 == net1[1]:
nets[ii] = None
nets[i] = [net1[0], net[1]]
re_loop = True
break
# 不能拼接
if net[1] < net1[0]:
continue
if net[0] > net1[1]:
continue
# 重叠merge
nets[ii] = None
nets[i] = [min(net[0], net1[0]), max(net[1], net1[1])]
re_loop = True
break
if re_loop:
i = 0
else:
i += 1
for net in nets:
if net:
first_ip = ipaddress.IPv4Address(net[0])
last_ip = ipaddress.IPv4Address(net[1])
out_nets.extend(ipaddress.summarize_address_range(first_ip, last_ip))
return out_nets
def transfer2range(ip_desc : str):
if "range" in ip_desc:
res = ip_desc.split()
try:
res[1] = IPy.IP(res[1]).int()
res[2] = IPy.IP(res[2]).int()
return res[1:]
except Exception as e:
logger.error(f"{e}")
trace_logger.error(traceback.format_exc())
return
finally:
""
net = None
res = None
try:
if '/' in ip_desc:
net = ipaddress.IPv4Network(ip_desc.split("/")[0])
net = ipaddress.IPv4Network(ip_desc)
except Exception as e:
logger.error(f"{e}")
trace_logger.error(traceback.format_exc())
if not net:
return
finally:
""
try:
return [IPy.IP(str(net.network_address)).int(), IPy.IP(str(net.broadcast_address)).int() ]
except Exception as e:
logger.error(f"{e}")
trace_logger.error(traceback.format_exc())
return
finally:
""
def merge_ips(ip_desc: list):
nets = list()
for s in ip_desc:
net = transfer2range(s)
if net:
nets.append(net)
nets = merge_range_net(nets)
return nets
if __name__ == "__main__":
try:
ipdescs= ['1.1.1.1', '1.1.1.0', "range 1.1.1.1 1.1.1.4", '1.1.1.16/28', '1.1.1.20/31', '1.1.1.0/255.255.255.0']
print(merge_ips(ipdescs))
except Exception as e:
print(traceback.format_exc())