[quagga-dev 11415] Re: use poll instead of select

Timo Teras timo.teras at iki.fi
Thu Aug 14 10:05:01 BST 2014


On Wed, 13 Aug 2014 08:53:33 +0200
Hannes Hofer <hannes.hofer at gmail.com> wrote:

> apparently a ugly disclamer is added automatically by my employer.
> Here I send the patch again, credit still goes to Barracuda Networks.

Do you have some numbers on how the new code works better?

Few notes also on the memory allocation. Have not looked rest of the
code in detail yet.

> diff --git lib/thread.c lib/thread.c
> index 468edd9..55a677c 100644
> --- lib/thread.c
> +++ lib/thread.c
> @@ -531,7 +531,18 @@ thread_master_create ()
>        = hash_create ((unsigned int (*) (void *))cpu_record_hash_key,
>  		     (int (*) (const void *, const void *))cpu_record_hash_cmp);
>  
> +  /* initialize thread_master and thread_master pfds
> +     return NULL when malloc fails */
>    rv = XCALLOC (MTYPE_THREAD_MASTER, sizeof (struct thread_master));
> +  if (rv == NULL)
> +    return NULL;
> +
> +  rv->pfd.pfds = (struct pollfd *) malloc(sizeof(struct pollfd) * rv->pfd.pfdsize);
> +
> +  if (rv->pfd.pfds == NULL)
> +    return NULL;

The error path leaks 'rv'. Also would it be possible to use the quagga
memory allocation routines so we get the allocations included in the
memory usage dumps?

> +
> +  memset(rv->pfd.pfds, 0, sizeof(struct pollfd) * rv->pfd.pfdsize);

malloc+memset = calloc. Use XCALLOC.

>    /* Initialize the timer queues */
>    rv->timer = pqueue_create();
> @@ -624,7 +635,8 @@ thread_master_free (struct thread_master *m)
>    thread_list_free (m, &m->ready);
>    thread_list_free (m, &m->unuse);
>    thread_queue_free (m, m->background);
> -  
> +  free(m->pfd.pfds);
> +
>    XFREE (MTYPE_THREAD_MASTER, m);
>  
>    if (cpu_record)
> @@ -712,6 +724,75 @@ thread_get (struct thread_master *m, u_char type,
>    return thread;
>  }
>  
> +/* alloc new space for pollfds. double size of pollfds */
> +static short
> +realloc_pfds (struct thread_master *m, int fd)
> +{
> +  size_t oldpfdlen = m->pfd.pfdsize * sizeof(struct pollfd);
> +  m->pfd.pfdsize *= 2;
> +  void *newpfd = NULL;
> +  newpfd = realloc(m->pfd.pfds, m->pfd.pfdsize * sizeof(struct
> pollfd));
> +  if (newpfd == NULL)
> +    {
> +      close(fd);
> +      zlog (NULL, LOG_ERR, "failed to allocate space for pollfds");
> +      return 0;
> +    }
> +  memset((struct pollfd*)newpfd + (m->pfd.pfdsize / 2), 0,
> oldpfdlen);
> +  m->pfd.pfds = (struct pollfd*)newpfd;
> +  return 1;
> +}
> +
> +/* generic add thread function */
> +struct thread *
> +generic_thread_add(struct thread_master *m, int (*func) (struct
> thread *),
> +		   void *arg, int fd, const char* funcname, short
> int event) +{
> +  struct thread *thread, *next;
> +  u_char type = THREAD_READ;
> +  assert (m != NULL);
> +  nfds_t queuepos = m->pfd.pfdcount;
> +  nfds_t i=0;
> +  for (i=0;i<m->pfd.pfdcount;i++)
> +    if (m->pfd.pfds[i].fd == fd)
> +      {
> +        queuepos = i;
> +        break;
> +      }
> +
> +  if (event & POLLOUT)
> +    {
> +      thread = (&m->write)->head;
> +      type = THREAD_WRITE;
> +    }
> +  else
> +    thread = (&m->read)->head;
> +
> +  /* check if fd is already present */
> +  for (thread; thread; thread = next)
> +    {
> +      next = thread->next;
> +      if (thread->u.fd == fd)
> +        {
> +          zlog (NULL, LOG_WARNING, "There is already fd [%d]", fd);
> +          return NULL;
> +        }
> +    }
> +
> +  /* is there enough space for a new fd? */
> +  if (queuepos >= m->pfd.pfdsize)
> +    if (realloc_pfds(m, fd) == 0)
> +      return NULL;
> +
> +  thread = thread_get (m, type, func, arg, funcname);
> +  m->pfd.pfds[queuepos].fd = fd;
> +  m->pfd.pfds[queuepos].events |= event;
> +  if (queuepos == m->pfd.pfdcount)
> +    m->pfd.pfdcount++;
> +
> +  return thread;
> +}
> +
>  /* Add new read thread. */
>  struct thread *
>  funcname_thread_add_read (struct thread_master *m, 
> @@ -721,14 +802,11 @@ funcname_thread_add_read (struct thread_master
> *m, 
>    assert (m != NULL);
>  
> -  if (FD_ISSET (fd, &m->readfd))
> -    {
> -      zlog (NULL, LOG_WARNING, "There is already read fd [%d]", fd);
> -      return NULL;
> -    }
> +  thread = generic_thread_add(m, func, arg, fd, funcname, (POLLIN |
> POLLHUP)); +
> +  if (thread == NULL)
> +    return NULL;
>  
> -  thread = thread_get (m, THREAD_READ, func, arg, funcname);
> -  FD_SET (fd, &m->readfd);
>    thread->u.fd = fd;
>    thread_list_add (&m->read, thread);
>  
> @@ -744,14 +822,12 @@ funcname_thread_add_write (struct thread_master
> *m, 
>    assert (m != NULL);
>  
> -  if (FD_ISSET (fd, &m->writefd))
> -    {
> -      zlog (NULL, LOG_WARNING, "There is already write fd [%d]", fd);
> -      return NULL;
> -    }
>  
> -  thread = thread_get (m, THREAD_WRITE, func, arg, funcname);
> -  FD_SET (fd, &m->writefd);
> +  thread = generic_thread_add(m, func, arg, fd, funcname, (POLLOUT |
> POLLHUP)); +
> +  if (thread == NULL)
> +    return NULL;
> +
>    thread->u.fd = fd;
>    thread_list_add (&m->write, thread);
>  
> @@ -872,16 +948,33 @@ thread_cancel (struct thread *thread)
>    struct thread_list *list = NULL;
>    struct pqueue *queue = NULL;
>    
> +  nfds_t i;
>    switch (thread->type)
>      {
>      case THREAD_READ:
> -      assert (FD_ISSET (thread->u.fd, &thread->master->readfd));
> -      FD_CLR (thread->u.fd, &thread->master->readfd);
> +      for (i=0;i<thread->master->pfd.pfdcount;++i)
> +        if (thread->master->pfd.pfds[i].fd == thread->u.fd)
> +          {
> +              /* remove thread fds from pfd list */
> +              memmove(thread->master->pfd.pfds+i,
> +                      thread->master->pfd.pfds+i+1,
> +                      (thread->master->pfd.pfdsize-i-1) *
> sizeof(struct pollfd));
> +              i--;
> +              thread->master->pfd.pfdcount--;
> +          }
>        list = &thread->master->read;
>        break;
>      case THREAD_WRITE:
> -      assert (FD_ISSET (thread->u.fd, &thread->master->writefd));
> -      FD_CLR (thread->u.fd, &thread->master->writefd);
> +      for (i=0;i<thread->master->pfd.pfdcount;++i)
> +        if (thread->master->pfd.pfds[i].fd == thread->u.fd)
> +          {
> +              /* remove thread fds from pfd list */
> +              memmove(thread->master->pfd.pfds+i,
> +                      thread->master->pfd.pfds+i+1,
> +                      (thread->master->pfd.pfdsize-i-1) *
> sizeof(struct pollfd));
> +              i--;
> +              thread->master->pfd.pfdcount--;
> +          }
>        list = &thread->master->write;
>        break;
>      case THREAD_TIMER:
> @@ -987,7 +1080,7 @@ thread_run (struct thread_master *m, struct
> thread *thread, }
>  
>  static int
> -thread_process_fd (struct thread_list *list, fd_set *fdset, fd_set
> *mfdset) +thread_process_fd (struct thread_list *list, short int
> state, int pos) {
>    struct thread *thread;
>    struct thread *next;
> @@ -999,13 +1092,12 @@ thread_process_fd (struct thread_list *list,
> fd_set *fdset, fd_set *mfdset) {
>        next = thread->next;
>  
> -      if (FD_ISSET (THREAD_FD (thread), fdset))
> +      if (thread->u.fd == thread->master->pfd.pfds[pos].fd)
>          {
> -          assert (FD_ISSET (THREAD_FD (thread), mfdset));
> -          FD_CLR(THREAD_FD (thread), mfdset);
>            thread_list_delete (list, thread);
>            thread_list_add (&thread->master->ready, thread);
>            thread->type = THREAD_READY;
> +          thread->master->pfd.pfds[pos].events &= ~(state);
>            ready++;
>          }
>      }
> @@ -1051,6 +1143,70 @@ thread_process (struct thread_list *list)
>    return ready;
>  }
>  
> +/* add snmp fds to poll set */
> +void
> +add_snmp_pollfds(struct thread_master *m, fd_set *snmpfds, int
> fdsetsize) +{
> +  int i;
> +  m->pfd.pfdcountsnmp = m->pfd.pfdcount;
> +  /* cycle trough fds and add neccessary fds to poll set */
> +  for (i=0;i<fdsetsize;++i)
> +    {
> +      if (FD_ISSET(i, snmpfds))
> +        {
> +          if (m->pfd.pfdcountsnmp > m->pfd.pfdsize)
> +            if (realloc_pfds(m, i) < 0)
> +              return;
> +
> +          m->pfd.pfds[m->pfd.pfdcountsnmp].fd = i;
> +          m->pfd.pfds[m->pfd.pfdcountsnmp].events = POLLIN;
> +          m->pfd.pfdcountsnmp++;
> +        }
> +    }
> +}
> +
> +/* check poll events */
> +void
> +check_pollfds(struct thread_master *m, fd_set *readfd)
> +{
> +  nfds_t i = 0;
> +  for (i = 0;i < m->pfd.pfdcount; ++i)
> +    {
> +      /* no event for current fd? immideatly continue */
> +      if(m->pfd.pfds[i].revents == 0)
> +        continue;
> +
> +      /* remove fd from list on POLLNVAL */
> +      if (m->pfd.pfds[i].revents & POLLNVAL)
> +        {
> +           memmove(m->pfd.pfds+i,
> +                   m->pfd.pfds+i+1,
> +                   (m->pfd.pfdsize-i-1) * sizeof(struct pollfd));
> +           m->pfd.pfdcount--;
> +           i--;
> +           continue;
> +        }
> +
> +      /* POLLIN / POLLOUT process event */
> +      if (m->pfd.pfds[i].revents & POLLIN)
> +        thread_process_fd(&m->read, POLLIN, i);
> +
> +      if (m->pfd.pfds[i].revents & POLLOUT)
> +        thread_process_fd(&m->write, POLLOUT, i);
> +
> +      /* remove fd from list on POLLHUP after other event is
> processed */
> +      if (m->pfd.pfds[i].revents & POLLHUP)
> +        {
> +           memmove(m->pfd.pfds+i,
> +                   m->pfd.pfds+i+1,
> +                   (m->pfd.pfdsize-i-1) * sizeof(struct pollfd));
> +           m->pfd.pfdcount--;
> +           i--;
> +        }
> +      else
> +          m->pfd.pfds[i].revents = 0;
> +    }
> +}
>  
>  /* Fetch next ready thread. */
>  struct thread *
> @@ -1091,11 +1247,6 @@ thread_fetch (struct thread_master *m, struct
> thread *fetch) /* Normal event are the next highest priority.  */
>        thread_process (&m->event);
>        
> -      /* Structure copy.  */
> -      readfd = m->readfd;
> -      writefd = m->writefd;
> -      exceptfd = m->exceptfd;
> -      
>        /* Calculate select wait timer if nothing else to do */
>        if (m->ready.count == 0)
>          {
> @@ -1123,12 +1274,22 @@ thread_fetch (struct thread_master *m, struct
> thread *fetch) snmpblock = 0;
>                memcpy(&snmp_timer_wait, timer_wait, sizeof(struct
> timeval)); }
> +          /* clear fdset since there are no other fds in fd_set,
> +             then add injected fds from snmp_select_info into
> pollset */
> +          FD_ZERO(&readfd);
>            snmp_select_info(&fdsetsize, &readfd, &snmp_timer_wait,
> &snmpblock);
> +          add_snmp_pollfds(m, &readfd, fdsetsize);
>            if (snmpblock == 0)
>              timer_wait = &snmp_timer_wait;
>          }
>  #endif
> -      num = select (FD_SETSIZE, &readfd, &writefd, &exceptfd,
> timer_wait);
> +      /* recalc timeout for poll. Attention NULL pointer is no
> timeout with
> +         select, where with poll no timeount is -1 */
> +      int timeout = -1;
> +      if (timer_wait != NULL)
> +        timeout = (timer_wait->tv_sec*1000) +
> (timer_wait->tv_usec/1000); +
> +      num = poll (m->pfd.pfds, m->pfd.pfdcount +
> m->pfd.pfdcountsnmp, timeout); 
>        /* Signals should get quick treatment */
>        if (num < 0)
> @@ -1140,6 +1301,14 @@ thread_fetch (struct thread_master *m, struct
> thread *fetch) }
>  
>  #if defined HAVE_SNMP && defined SNMP_AGENTX
> +      /* re-enter pollfds in fd_set for handling in snmp_read */
> +      FD_ZERO(&readfd);
> +      nfds_t i;
> +      for (i = m->pfd.pfdcount; i < m->pfd.pfdcountsnmp; ++i)
> +        {
> +          if (m->pfd.pfds[i].revents == POLLIN)
> +            FD_SET(m->pfd.pfds[i].fd, &readfd);
> +        }
>        if (agentx_enabled)
>          {
>            if (num > 0)
> @@ -1162,10 +1331,7 @@ thread_fetch (struct thread_master *m, struct
> thread *fetch) /* Got IO, process it */
>        if (num > 0)
>          {
> -          /* Normal priority read thead. */
> -          thread_process_fd (&m->read, &readfd, &m->readfd);
> -          /* Write thead. */
> -          thread_process_fd (&m->write, &writefd, &m->writefd);
> +          check_pollfds(m, &readfd);
>          }
>  
>  #if 0
> diff --git lib/thread.h lib/thread.h
> index dbf5f25..b4dc758 100644
> --- lib/thread.h
> +++ lib/thread.h
> @@ -24,6 +24,7 @@
>  #define _ZEBRA_THREAD_H
>  
>  #include <zebra.h>
> +#include <poll.h>
>  
>  struct rusage_t
>  {
> @@ -56,9 +57,16 @@ struct thread_master
>    struct thread_list ready;
>    struct thread_list unuse;
>    struct pqueue *background;
> -  fd_set readfd;
> -  fd_set writefd;
> -  fd_set exceptfd;
> +  struct
> +    {
> +      /* number of pfd stored in pfds */
> +      nfds_t pfdcount;
> +      /* number of pfd stored in pfds + number of snmp pfd */
> +      nfds_t pfdcountsnmp;
> +      /* number of pfd that fit in the allocated space of pfds */
> +      nfds_t pfdsize;
> +      struct pollfd *pfds;
> +    } pfd;
>    unsigned long alloc;
>  };
>  




More information about the Quagga-dev mailing list