需求设定:
家里的宽带最近非常不稳定,师傅上门后,总感觉还是有问题。写了个脚本,每隔N秒连一下网,看看有没有掉网。实现方案:
python3,联网使用requests库, 使用自带的logging模块,发现连接不上,写入到logging.log文件中 为了实验多进程,特意将联网专门启动了一个子进程: 父进程:启动子进程,子进程中获得消息,因为从子进程中获取消息的方法是阻塞的,得单独启动一个线程,对收到的信息进行loging 子进程:每隔N秒联网,发现连接不上,就向主进程发送消息。同时启动一个线程来接收主进程相关的消息 进程中通信,使用pipe的方式,pipe()返回一对连接对象,代表了pipe的两端。每个对象都有send()和recv()方法。代码如下:
# -*- encoding:utf-8 -*-import requestsimport multiprocessingimport timeimport threadingimport loggingimport os#初始化Log,将Log记录到def iniLog(): logfile = 'logging.log' log_format = '%(filename)s [%(asctime)s] [%(levelname)s] %(message)s' #todox jeig support utf8 encode handler = logging.FileHandler(logfile, "a",encoding = "UTF-8") formatter = logging.Formatter(log_format) handler.setFormatter(formatter) root_logger = logging.getLogger() root_logger.addHandler(handler) root_logger.setLevel(logging.INFO)#子进程接收消息def recv_message_child(conn): while True: print('子进程正在等待消息') print('获得消息:'+conn.recv())#父进程接收消息def recv_message_parent(conn): while True: print('父进程正在等待消息') print('获得消息:'+conn.recv()) logging.info(conn.recv())#尝试联网的时间间隔,以秒为单位TRY_TIME_SPACE_SECCOND = 15#子进程监测网络,每隔TRY_TIME_SPACE_SECCOND秒来访问下百度def checkNet_process(conn): #打印进程号 print("当前进程ID:%s,父进程ID:%s" % (os.getpid(),os.getppid())) # 子进程启动线程,来接收父进程发送的消息 t = threading.Thread(target=recv_message_child,args=(conn,)) t.setDaemon(True) t.start() #t.join() headers={ 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:43.0) Gecko/20100101 Firefox/43.0', } while True: print('child process is running') try: response = requests.get('http://www.baidu.com',headers=headers) print('返回码:'+str(response.status_code)) print(type(response.status_code)) if response.status_code != 200: #do some task print('+++++++++++++++++++++++++++++++++++++++网络无法访问,状态码:{0}'.format(response.status_code)) conn.send('网络无法访问,状态码:{0}'.format(response.status_code)) else: print('网络正常访问') except Exception as e: print('网络异常报错') conn.send('网络报错:'+str(e)) finally: time.sleep(TRY_TIME_SPACE_SECCOND)if __name__ == '__main__': iniLog() logging.info('net check start!') #开辟两个口,都是能进能出,括号中如果False即单向通信 conn_parent,conn_child=multiprocessing.Pipe() p=multiprocessing.Process(target=checkNet_process,args=(conn_child,)) #子进程使用sock口,调用checkNet_process函数 p.start() #父进程启动线程,来接收子进程发送的消息 t = threading.Thread(target=recv_message_parent, args=(conn_parent,)) t.setDaemon(True) t.start() t.join()