Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions kafka/net/selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@

log = logging.getLogger(__name__)

def log_trace(msg, *args, **kwargs):
log.log(5, msg, *args, **kwargs)


MAX_TIMEOUT = 2147483

Expand Down Expand Up @@ -229,7 +232,7 @@ def _next_scheduled_timeout(self, now):

# Note: Windows select works w/ sockets only
def register_event(self, fileobj, event, task):
log.log(0, 'net.register_event: %s, %s, %s', fileobj, event, task)
log_trace('net.register_event: %s, %s, %s', fileobj, event, task)
if not isinstance(task, Task):
task = Task(task)
try:
Expand All @@ -244,7 +247,7 @@ def register_event(self, fileobj, event, task):
self._selector.register(fileobj, event, (task, None) if event == selectors.EVENT_READ else (None, task))

def unregister_event(self, fileobj, event):
log.log(0, 'net.unregister_event: %s, %s', fileobj, event)
log_trace('net.unregister_event: %s, %s', fileobj, event)
try:
key = self._selector.get_key(fileobj)
reader, writer = key.data
Expand Down Expand Up @@ -273,6 +276,7 @@ def poll(self, timeout_ms=None, future=None):
raise RuntimError('Recursive access to net.poll!')
elif self._running:
raise RuntimeError('Concurrent access to net.poll!')
log_trace('poll: enter')
self._running = True
start_at = time.monotonic()
inner_timeout = timeout_ms / 1000 if timeout_ms is not None else None
Expand All @@ -287,8 +291,10 @@ def poll(self, timeout_ms=None, future=None):
if inner_timeout <= 0:
break
self._running = False
log_trace('poll: exit')

def _poll_once(self, timeout=None):
log_trace('_poll_once: enter')
if self._ready:
timeout = 0
else:
Expand All @@ -304,13 +310,15 @@ def _poll_once(self, timeout=None):
timeout = 0

ready_events = self._selector.select(timeout)
log_trace('_poll_once: %d ready_events', len(ready_events))
self._process_events(ready_events)
self._schedule_tasks()

n = len(self._ready)
for i in range(n):
self._current = self._ready.popleft()
try:
log_trace('Calling task %s', self._current)
event = self._current()

except StopIteration:
Expand All @@ -321,14 +329,15 @@ def _poll_once(self, timeout=None):

else:
if isinstance(event, KernelEvent):
log.log(0, 'kernel event %s', event.method)
log_trace('kernel event %s', event.method)
getattr(self, event.method)(*event.args)
elif isinstance(event, Future):
event.add_both(lambda _, task=self._current: self.call_soon(task))
else:
raise RuntimeError('Unhandled event type: %s' % event)

self._current = None
log_trace('_poll_once: exit')

def wakeup(self):
try:
Expand Down
Loading