1.整体架构

整体思路较为简单,主要包括http服务模块和任务管理模块。http服务模块监听指定端口,等待后端的POST请求,收到POST请求后,返回OK给后端并添加计算任务到阻塞队列。任务管理模块并发处理任务队列中的任务,处理完成后回调后端,返回结果。

本项目基于python3的hhtp.server库实现。

2.代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
"""
@author: ygq65536
简单易用的深度学习算法HTTP服务器程序
"""
import http.server
import json
import algorithm_process # 深度学习算法接口
import numpy as np
from urllib.request import urlopen
import cv2
import requests
from multiprocessing import Process
from queue import Queue
import logging


def task_process(img_url, task_id, callback_url):
"""下载图片,执行算法,整理结果,回调后端
"""
try:
resp = urlopen(img_url)
image = np.asarray(bytearray(resp.read()), dtype="uint8")
image = cv2.imdecode(image, cv2.IMREAD_COLOR)
result = algorithm_process(image)
result["task_id"] = task_id
data_json = json.dumps(result) #dumps:将python对象解码为json数据
headers = {"Content-Type":"application/json",
"appKey":"XXX",
"appSecret":"XXXXXXXXXX"}
callback_resp = requests.post(callback_url, data_json, headers=headers)

except Exception as ex:
return

def task_manage(q):
"""队列有任务则调用算法模块处理,否则阻塞
"""
while True:
try:
datas = q.get(block=True)
task_id = datas["task_id"]
img_url = datas["img_url"]
callback_url = datas["callback_url"]
task_process(img_url, task_id, callback_url)
except Exception as ex:
continue

class algorithm_server(http.server.BaseHTTPRequestHandler):
"""http服务类 监听并异步响应post调用
"""
def setup(self):
self.request.settimeout(10)
http.server.BaseHTTPRequestHandler.setup(self)

def _set_response(self):
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()

def do_GET(self):
buf = 'XXXX'
self.protocal_version = 'HTTP/1.1'
self._set_response()
buf = bytes(buf, encoding="utf-8")
self.wfile.write(buf)

def do_POST(self):
'''
处理通过POST方式传递过来的数据(放入全局任务队列)
异步调用模型得到结果并返回
'''
path = self.path
#获取post提交的数据
datas = self.rfile.read(int(self.headers['content-length']))
datas = datas.decode('utf-8')
datas = json.loads(datas)
if "task_id" in datas:
task_id = datas["task_id"]
else:
buf = '403'
buf = bytes(buf, encoding="utf-8")
self.wfile.write(buf)
return
if "img_url" in datas:
img_url = datas["img_url"]
else:
buf = '403'
buf = bytes(buf, encoding="utf-8")
self.wfile.write(buf)
return
if "callback_url" in datas:
callback_url = datas["callback_url"]
else:
buf = '403'
buf = bytes(buf, encoding="utf-8")
self.wfile.write(buf)
return
buf = '200'
self._set_response()
buf = bytes(buf, encoding="utf-8")
self.wfile.write(buf)
task_queue.put(datas)
return

def start_server(ip, port):
http_server = http.server.HTTPServer((ip, int(port)), algorithm_server)
try:
http_server.serve_forever() #设置一直监听并接收请求
except KeyboardInterrupt:
pass
http_server.server_close()

if __name__ == '__main__':
# 全局任务队列
process_num = 8
task_queue = Queue(maxsize=0)
for _ in range(process_num):
Process(target=task_manage, args=(task_queue,)).start()
start_server('0.0.0.0', 9753) # For IPv4 Network Only