作者:國夫君
來源:見文末
ioloop
`ioloop`是`tornado`的核心模組,也是個排程模組,各種非同步事件都是由他排程的,所以必須弄清他的執行邏輯
原始碼分析
而`ioloop`的核心部分則是 `while True`這個迴圈內部的邏輯,貼上他的程式碼下
def start(self):
if self._running:
raise RuntimeError(“IOLoop is already running”)
self._setup_logging()
if self._stopped:
self._stopped = False
return
old_current = getattr(IOLoop._current, “instance”, None)
IOLoop._current.instance = self
self._thread_ident = thread.get_ident()
self._running = True
old_wakeup_fd = None
if hasattr(signal, ‘set_wakeup_fd’) and os.name == ‘posix’:
try:
old_wakeup_fd = signal.set_wakeup_fd(self._waker.write_fileno())
if old_wakeup_fd != –1:
signal.set_wakeup_fd(old_wakeup_fd)
old_wakeup_fd = None
except ValueError:
old_wakeup_fd = None
try:
while True:
with self._callback_lock:
callbacks = self._callbacks
self._callbacks = []
due_timeouts = []
if self._timeouts:
now = self.time()
while self._timeouts:
if self._timeouts[0].callback is None:
heapq.heappop(self._timeouts)
self._cancellations -= 1
elif self._timeouts[0].deadline <= now:
due_timeouts.append(heapq.heappop(self._timeouts))
else:
break
if (self._cancellations > 512
and self._cancellations > (len(self._timeouts) >> 1)):
self._cancellations = 0
self._timeouts = [x for x in self._timeouts
if x.callback is not None]
heapq.heapify(self._timeouts)
for callback in callbacks:
self._run_callback(callback)
for timeout in due_timeouts:
if timeout.callback is not None:
self._run_callback(timeout.callback)
callbacks = callback = due_timeouts = timeout = None
if self._callbacks:
poll_timeout = 0.0
elif self._timeouts:
poll_timeout = self._timeouts[0].deadline – self.time()
poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT))
else:
poll_timeout = _POLL_TIMEOUT
if not self._running:
break
if self._blocking_signal_threshold is not None:
signal.setitimer(signal.ITIMER_REAL, 0, 0)
try:
event_pairs = self._impl.poll(poll_timeout)
except Exception as e:
if errno_from_exception(e) == errno.EINTR:
continue
else:
raise
if self._blocking_signal_threshold is not None:
signal.setitimer(signal.ITIMER_REAL,
self._blocking_signal_threshold, 0)
self._events.update(event_pairs)
while self._events:
fd, events = self._events.popitem()
try:
fd_obj, handler_func = self._handlers[fd]
handler_func(fd_obj, events)
except (OSError, IOError) as e:
if errno_from_exception(e) == errno.EPIPE:
pass
else:
self.handle_callback_exception(self._handlers.get(fd))
except Exception:
self.handle_callback_exception(self._handlers.get(fd))
fd_obj = handler_func = None
finally:
self._stopped = False
if self._blocking_signal_threshold is not None:
signal.setitimer(signal.ITIMER_REAL, 0, 0)
IOLoop._current.instance = old_current
if old_wakeup_fd is not None:
signal.set_wakeup_fd(old_wakeup_fd)
除去註釋,程式碼其實沒多少行. 由while 內部程式碼可以看出ioloop主要由三部分組成:
1.回呼 callbacks
他是ioloop回呼的基礎部分,透過IOLoop.instance().add_callback()新增到self._callbacks,他們將在每一次loop中被執行.
主要用途是將邏輯分塊,在適合時機將包裝好的callback新增到self._callbacks讓其執行.
例如ioloop中的add_future
def add_future(self, future, callback):
“””Schedules a callback on the “IOLoop“ when the given
`.Future` is finished.
The callback is invoked with one argument, the
`.Future`.
“””
assert is_future(future)
callback = stack_context.wrap(callback)
future.add_done_callback(
lambda future: self.add_callback(callback, future))
future物件得到result的時候會呼叫future.add_done_callback新增的callback,再將其轉至ioloop執行
2.定時器 due_timeouts
這是定時器,在指定的事件執行callback.
跟1中的callback類似,透過IOLoop.instance().add_callback在每一次迴圈,會計算timeouts回呼串列裡的事件,執行已到期的callback.
當然不是無節操的迴圈.
因為poll操作會阻塞到有io操作發生,所以只要計算最近的timeout, 然後用這個時間作為self._impl.poll(poll_timeout) 的 poll_timeout ,就可以達到按時運行了
但是,假設poll_timeout的時間很大時,self._impl.poll一直在堵塞中(沒有io事件,但在處理某一個io事件), 那新增剛才1中的callback不是要等很久才會被執行嗎? 答案當然是不會.
ioloop中有個waker物件,他是由兩個fd組成,一個讀一個寫.
ioloop在初始化的時候把waker系結到epoll裡了,add_callback時會觸發waker的讀寫.
這樣ioloop就會在poll中被喚醒了,接著就可以及時處理timeout callback了
用這樣的方式也可以自己封裝一個小的定時器功能玩玩
3.io事件的event loop
處理epoll事件的功能
透過IOLoop.instance().add_handler(fd, handler, events)系結fd event的處理事件
在httpserver.listen的程式碼內,
netutil.py中的netutil.py的add_accept_handler系結accept handler處理客戶端接入的邏輯
如法炮製,其他的io事件也這樣系結,業務邏輯的分塊交由ioloop的callback和future處理
關於epoll的用法的內容.詳情見我第一篇文章吧,哈哈
總結
ioloop由callback(業務分塊), timeout callback(定時任務) io event(io傳輸和解析) 三塊組成,互相配合完成非同步的功能,構建gen,httpclient,iostream等功能。
串聯大致的流程是,tornado 系結io event,處理io傳輸解析,傳輸完成後(結合Future)回呼(callback)業務處理的邏輯和一些固定操作 . 定時器則是較為獨立的模組
Futrue
個人認為Future是tornado僅此ioloop重要的模組,他貫穿全文,所有非同步操作都有他的身影。顧名思義,他主要是關註日後要做的事,類似jquery的Deferred吧
一般的用法是透過ioloop的add_future定義future的done callback, 當future被set_result的時候,future的done callback就會被呼叫. 從而完成Future的功能.
具體可以參考gen.coroutine的實現,本文後面也會講到
他的組成不複雜,只有幾個重要的方法,最重要的是 add_done_callback , set_result
tornado用Future和ioloop,yield實現了gen.coroutine
1. add_done_callback
跟ioloop的callback類似 , 儲存事件完成後的callback在self._callbacks裡
def add_done_callback(self, fn):
if self._done:
fn(self)
else:
self._callbacks.append(fn)
2.set_result
設定事件的結果,並執行之前儲存好的callback
def set_result(self, result):
self._result = result
self._set_done()
def _set_done(self):
self._done = True
for cb in self._callbacks:
try:
cb(self)
except Exception:
app_log.exception(‘Exception in callback %r for %r’,
cb, self)
self._callbacks = None
為了驗證之前所說的,上一段測試程式碼
#! /usr/bin/env python
#coding=utf-8
import tornado.web
import tornado.ioloop
from tornado.gen import coroutine
from tornado.concurrent import Future
def test():
def pp(s):
print s
future = Future()
iol = tornado.ioloop.IOLoop.instance()
print ‘init future %s’%future
iol.add_future(future, lambda f: pp(‘ioloop callback after future done,future is %s’%f))
#模擬io延遲操作
iol.add_timeout(iol.time()+5,lambda:future.set_result(‘set future is done’))
print ‘init complete’
tornado.ioloop.IOLoop.instance().start()
if __name__ == “__main__”:
test()
執行結果:
gen.coroutine
接著繼續延伸,看看coroutine的實現。
gen.coroutine實現的功能其實是將原來的callback的寫法,用yield的寫法代替. 即以yield為分界,將程式碼分成兩部分.
如:
#! /usr/bin/env python
#coding=utf-8
import tornado.ioloop
from tornado.gen import coroutine
from tornado.httpclient import AsyncHTTPClient
@coroutine
def cotest():
client = AsyncHTTPClient()
res = yield client.fetch(“http://www.segmentfault.com/”)
print res
if __name__ == “__main__”:
f = cotest()
print f #這裡傳回了一個future哦
tornado.ioloop.IOLoop.instance().start()
執行結果:
原始碼分析
接下來分析下coroutine的實現
def _make_coroutine_wrapper(func, replace_callback):
@functools.wraps(func)
def wrapper(*args, **kwargs):
future = TracebackFuture()
if replace_callback and ‘callback’ in kwargs:
callback = kwargs.pop(‘callback’)
IOLoop.current().add_future(
future, lambda future: callback(future.result()))
try:
result = func(*args, **kwargs)
except (Return, StopIteration) as e:
result = getattr(e, ‘value’, None)
except Exception:
future.set_exc_info(sys.exc_info())
return future
else:
if isinstance(result, types.GeneratorType):
try:
orig_stack_contexts = stack_context._state.contexts
yielded = next(result)
if stack_context._state.contexts is not orig_stack_contexts:
yielded = TracebackFuture()
yielded.set_exception(
stack_context.StackContextInconsistentError(
‘stack_context inconsistency (probably caused ‘
‘by yield within a “with StackContext” block)’))
except (StopIteration, Return) as e:
future.set_result(getattr(e, ‘value’, None))
except Exception:
future.set_exc_info(sys.exc_info())
else:
Runner(result, future, yielded)
try:
return future
finally:
future = None
future.set_result(result)
return future
return wrapper
如原始碼所示,func執行的結果是GeneratorType ,yielded = next(result), 執行至原函式的yield位置,傳回的是原函式func內部 yield 右邊傳回的物件(必須是Future或Future的list)給yielded.經過Runner(result, future, yielded) 對yielded進行處理.在此就 貼出Runner的程式碼了.
Runner初始化過程,呼叫handle_yield, 檢視yielded是否已done了,否則add_future執行Runner的run方法, run方法中如果yielded物件已完成,用對它的gen呼叫send,傳送完成的結果.
所以yielded在什麼地方被set_result非常重要, 當被set_result的時候,才會send結果給原func,完成整個非同步操作
詳情可以檢視tornado 中重要的物件 iostream,原始碼中iostream的 _handle_connect,如此設定了連線的result.
def _handle_connect(self):
err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
if err != 0:
self.error = socket.error(err, os.strerror(err))
if self._connect_future is None:
gen_log.warning(“Connect error on fd %s: %s”,
self.socket.fileno(), errno.errorcode[err])
self.close()
return
if self._connect_callback is not None:
callback = self._connect_callback
self._connect_callback = None
self._run_callback(callback)
if self._connect_future is not None:
future = self._connect_future
self._connect_future = None
future.set_result(self)
self._connecting = False
最後貼上一個簡單的測試程式碼,演示coroutine,future的用法
import tornado.ioloop
from tornado.gen import coroutine
from tornado.concurrent import Future
@coroutine
def asyn_sum(a, b):
print(“begin calculate:sum %d+%d”%(a,b))
future = Future()
future2 = Future()
iol = tornado.ioloop.IOLoop.instance()
print future
def callback(a, b):
print(“calculating the sum of %d+%d:”%(a,b))
future.set_result(a+b)
iol.add_timeout(iol.time()+3,lambda f:f.set_result(None),future2)
iol.add_timeout(iol.time()+3,callback, a, b)
result = yield future
print(“after yielded”)
print(“the %d+%d=%d”%(a, b, result))
yield future2
print ‘after future2’
def main():
f = asyn_sum(2,3)
print ”
print f
tornado.ioloop.IOLoop.instance().start()
if __name__ == “__main__”:
main()
執行結果:
為什麼程式碼中個yield都起作用了? 因為Runner.run裡,最後繼續用handle_yield處理了send後傳回的yielded物件,意思是func裡可以有n幹個yield操作
if not self.handle_yield(yielded):
return
總結
至此,已完成tornado中重要的幾個模組的流程,其他模組也是由此而來.寫了這麼多,越寫越卡,就到此為止先吧。
來源:國夫君
segmentfault.com/a/1190000002971992
《Linux雲端計算及運維架構師高薪實戰班》2018年09月16日即將開課中,120天衝擊Linux運維年薪30萬,改變速約~~~~
*宣告:推送內容及圖片來源於網路,部分內容會有所改動,版權歸原作者所有,如來源資訊有誤或侵犯權益,請聯絡我們刪除或授權事宜。
– END –
更多Linux好文請點選【閱讀原文】哦
↓↓↓