diff --git a/kafka/net/selector.py b/kafka/net/selector.py index 0198367cf..464e1db28 100644 --- a/kafka/net/selector.py +++ b/kafka/net/selector.py @@ -12,6 +12,9 @@ log = logging.getLogger(__name__) +def log_trace(msg, *args, **kwargs): + log.log(5, msg, *args, **kwargs) + MAX_TIMEOUT = 2147483 @@ -229,7 +232,7 @@ def _next_scheduled_timeout(self, now): # Note: Windows select works w/ sockets only def register_event(self, fileobj, event, task): - log.log(0, 'net.register_event: %s, %s, %s', fileobj, event, task) + log_trace('net.register_event: %s, %s, %s', fileobj, event, task) if not isinstance(task, Task): task = Task(task) try: @@ -244,7 +247,7 @@ def register_event(self, fileobj, event, task): self._selector.register(fileobj, event, (task, None) if event == selectors.EVENT_READ else (None, task)) def unregister_event(self, fileobj, event): - log.log(0, 'net.unregister_event: %s, %s', fileobj, event) + log_trace('net.unregister_event: %s, %s', fileobj, event) try: key = self._selector.get_key(fileobj) reader, writer = key.data @@ -273,6 +276,7 @@ def poll(self, timeout_ms=None, future=None): raise RuntimError('Recursive access to net.poll!') elif self._running: raise RuntimeError('Concurrent access to net.poll!') + log_trace('poll: enter') self._running = True start_at = time.monotonic() inner_timeout = timeout_ms / 1000 if timeout_ms is not None else None @@ -287,8 +291,10 @@ def poll(self, timeout_ms=None, future=None): if inner_timeout <= 0: break self._running = False + log_trace('poll: exit') def _poll_once(self, timeout=None): + log_trace('_poll_once: enter') if self._ready: timeout = 0 else: @@ -304,6 +310,7 @@ def _poll_once(self, timeout=None): timeout = 0 ready_events = self._selector.select(timeout) + log_trace('_poll_once: %d ready_events', len(ready_events)) self._process_events(ready_events) self._schedule_tasks() @@ -311,6 +318,7 @@ def _poll_once(self, timeout=None): for i in range(n): self._current = self._ready.popleft() try: + log_trace('Calling task %s', self._current) event = self._current() except StopIteration: @@ -321,7 +329,7 @@ def _poll_once(self, timeout=None): else: if isinstance(event, KernelEvent): - log.log(0, 'kernel event %s', event.method) + log_trace('kernel event %s', event.method) getattr(self, event.method)(*event.args) elif isinstance(event, Future): event.add_both(lambda _, task=self._current: self.call_soon(task)) @@ -329,6 +337,7 @@ def _poll_once(self, timeout=None): raise RuntimeError('Unhandled event type: %s' % event) self._current = None + log_trace('_poll_once: exit') def wakeup(self): try: