Python3 threading.Thread 如何线程间通信 queque
发布时间:2017-11-30T15:38:29:手机请访问

由于GIL的存在,多线程进行cpu密集型操作并不能提高执行效率(针对 python2),我们修改
程序构架
1 使用多个DownloadThread
线程进行下载(I/O操作)
2 使用一个ConvertThread
线程进行转换(CPU操作)
3 下载线程把下载数据 安全地传递给转换线程
解决方案:
使用标准库中的Queue.Queue
,它是一个线程安全的队列
Download线程把下载数据放入队列 ,Convert
线程从队列里提取数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# -*- coding: utf-8 -*- import csv from xml.etree.cElementTree import ElementTree, Element import requests from io import StringIO # 以下为生产者-消费者模型 # 一个进程的多个线程使用的是同一个地址空间 # 实现下载线程 from queue import Queue class DownloadThread(Thread): def __init__(self, sid, queue): Thread.__init__(self) self.sid = sid self.url = 'http://table.finance.yahoo.com/table.csv?s=%s.sz' self.url %= str(sid).rjust(6, '0') self.queue = queue @staticmethod def download(url): response = requests.get(url, timeout=3) if response.ok: return (StringIO(str(response.content))) # 返回一个支持文件操作的内存对象 def run(self): print('Downloading ... (%d)' % self.sid) # 1下载 data = self.download(self.url) # 2 将数据传给转换线程 sid, data # 加锁安全 self.queue.put((self.sid, data)) class ConvertThread(Thread): def __init__(self, queue): Thread.__init__(self) self.queue = queue def csvToXml(self, scsv, fxml): reader = csv.reader(scsv) headers = next(reader) headers = map(lambda h: h.replace(' ', ''), headers) root = Element('Data') for row in reader: eRow = Element('Row') root.append(eRow) for tag, text in zip(headers, row): e = Element(tag) e.text = text eRow.append(e) et = ElementTree(root) et.write(fxml) def run(self): while True: sid, data = self.queue.get() print('Converting to XML ... (%d)' % sid) if sid == -1: break if data: fname = str(sid).rjust(6, '0') + '.xml' with open(fname, 'wb') as wf: self.csvToXml(data, wf) if __name__ == '__main__': q = Queue() dThreads = [DownloadThread(i, q) for i in range(1, 11)] cThreads = ConvertThread(q) for t in dThreads: t.start() cThreads.start() for t in dThreads: t.join() # http://blog.csdn.net/u013679490/article/details/54767220 q.put((-1, None)) |
