Logo Search packages:      
Sourcecode: ksh version File versions

job.c

/***********************************************************************
*                                                                      *
*               This software is part of the ast package               *
*           Copyright (c) 1990-2007 AT&T Knowledge Ventures            *
*                      and is licensed under the                       *
*                  Common Public License, Version 1.0                  *
*                      by AT&T Knowledge Ventures                      *
*                                                                      *
*                A copy of the License is available at                 *
*            http://www.opensource.org/licenses/cpl1.0.txt             *
*         (with md5 checksum 059e8cd6165cb4c31e351f2b69388fd9)         *
*                                                                      *
*              Information and Software Systems Research               *
*                            AT&T Research                             *
*                           Florham Park NJ                            *
*                                                                      *
*                 Glenn Fowler <gsf@research.att.com>                  *
*                                                                      *
***********************************************************************/
#pragma prototyped
/*
 * Glenn Fowler
 * AT&T Bell Laboratories
 *
 * remote coshell server job and connection support
 */

#include "service.h"

/*
 * drop a connection
 */

void
drop(register int fd)
{
      register int      i;
      register int      n;
      Coshell_t*  sp;
      Cojob_t*    jp;

      switch (state.con[fd].type)
      {
      case USER:
            if (state.con[fd].info.user.running)
                  for (jp = state.job; jp <= state.jobmax; jp++)
                        if (jp->fd == fd && jp->pid)
                        {
                              jp->fd = 0;
                              jobkill(jp, SIGKILL);
                        }
            /*FALLTHROUGH*/
      case INIT:
            for (i = 0; i < elementsof(state.con[fd].info.user.fds); i++)
                  if ((n = state.con[fd].info.user.fds[i]) != fd && n >= 0 && state.con[n].type)
                  {
                        close(n);
                        state.con[n].type = 0;
                  }
            if (sp = state.con[fd].info.user.home)
                  sp->home--;
            if (state.con[fd].info.user.pump)
                  free(state.con[fd].info.user.pump);
            if (state.con[fd].info.user.expr)
                  free(state.con[fd].info.user.expr);
            break;
      case PASS:
            if ((jp = state.con[fd].info.pass.job) && jp->pid)
            {
                  if (--jp->ref <= 0) jobdone(jp);
                  else if (!jp->lost) jp->lost = cs.time + UPDATE;
            }
            break;
      case POLL:
            error(ERROR_SYSTEM|3, "lost connect stream");
            break;
      case SHELL:
            sp = state.con[fd].info.shell;
            sp->fd = -1;
            shellclose(sp, -1);
            break;
      }
      state.con[fd].type = 0;
      state.con[fd].error = 0;
      csfd(fd, CS_POLL_CLOSE);
}

/*
 * check the job table for hogs running on busy shells
 * or jobs queued on hung shells
 * if only!=0 then only that shell is checked
 */

void
jobcheck(Coshell_t* only)
{
      register Cojob_t* jp;
      register Coshell_t*     sp;
      char*             s;

      for (jp = state.job; jp <= state.jobmax; jp++)
            if (jp->pid && ((sp = jp->shell) == only || !only))
            {
                  if (jp->pid > 0)
                  {
                        if (sp->update <= cs.time && sp->errors < ERRORS) update(sp);
                        if (jp->lost && jp->lost < cs.time)
                        {
                              message((-4, "jobcheck: %s: job %d pid %d lost", sp->name, jp->rid, jp->pid));
                              jp->sig = SIGKILL;
                              jobdone(jp);
                              continue;
                        }
                        if (sp->idle)
                        {
                              if (sp->stat.idle < state.busy && (!sp->bypass || !miscmatch(sp, sp->bypass)))
                              {
                                    if (!jp->busy && state.grace) jp->busy = cs.time + state.grace;
                                    else if (jp->busy < cs.time)
                                    {
                                          if (state.migrate)
                                          {
                                                int   n;

                                                error(ERROR_OUTPUT|2, state.con[jp->fd].info.user.fds[2], "%s: job=%d pid=%d %s", sp->name, jp - state.job, jp->pid, state.migrate);
                                                n = sfprintf(state.string, "job=%d; pid=%d; host=%s; type=%s; %s\n", jp - state.job, jp->pid, sp->name, sp->type, state.migrate);
                                                if (s = sfstruse(state.string))
                                                      cswrite(jp->shell->fd, s, n);
                                                else
                                                      error(ERROR_OUTPUT|2, state.con[jp->fd].info.user.fds[2], "out of space");
                                                jp->sig = SIGKILL;
                                                jobdone(jp);
                                          }
                                          else
                                          {
#ifdef SIGSTOP
                                                error(ERROR_OUTPUT|2, state.con[jp->fd].info.user.fds[2], "%s: job=%d pid=%d stopped", sp->name, jp - state.job, jp->pid);
                                                jobkill(jp, SIGSTOP);
#else
                                                error(ERROR_OUTPUT|2, state.con[jp->fd].info.user.fds[2], "%s: job=%d pid=%d is a hog", sp->name, jp - state.job, jp->pid);
#endif
                                                jp->busy = HOG;
                                          }
                                    }
                                    continue;
                              }
                        }
                        if (jp->busy == HOG)
                        {
#ifdef SIGCONT
                              error(ERROR_OUTPUT|2, state.con[jp->fd].info.user.fds[2], "job %d pid %d restarted on %s", jp - state.job, jp->pid, sp->name);
                              jobkill(jp, SIGCONT);
#else
                              error(ERROR_OUTPUT|2, state.con[jp->fd].info.user.fds[2], "job %d pid %d is no longer hogging %s", jp - state.job, jp->pid, sp->name);
                              jp->busy = 0;
#endif
                        }
                  }
                  else if (jp->cmd)
                  {
                        if (jp->pid == QUEUE && (sp->fd > 0 || sp == &state.wait))
                        {
                              if (state.running < (state.perserver + state.jobwait) && state.con[jp->fd].info.user.running < (state.peruser + 1) && (sp == &state.wait || sp->running < ((state.perhost ? state.perhost : sp->cpu * state.percpu) + 1)))
                                    shellexec(jp, jp->cmd, jp->fd);
                        }
                        else if (cs.time > jp->start + LOST)
                        {
                              message((-4, "jobcheck: %s: possibly hung %s", sp->name, fmtelapsed(cs.time - sp->start, 1)));
                              shellclose(sp, -1);
                        }
                  }
            }
      if (state.jobwait || state.shellwait) cswakeup(state.wakeup = UPDATE * 1000L);
      else if (!state.busy || !state.running) cswakeup(state.wakeup = 0L);
      else if (!only) cswakeup(state.wakeup = UPDATE * 1000L);
}

/*
 * kill job with sig
 */

void
jobkill(Cojob_t* jp, int sig)
{
      int   n;
      char  buf[128];

      if (jp->pid)
      {
            jp->sig = sig;
            jp->busy = 0;
            if (jp->pid > 0 && jp->shell->fd > 0)
            {
                  n = sfsprintf(buf, sizeof(buf), "kill -%s -%d\n", fmtsignal(-sig), jp->pid);
                  cswrite(jp->shell->fd, buf, n);
                  message((-2, "killpg -%s %s.%d", fmtsignal(-sig), jp->shell->name, jp->pid));
            }
            if (sig == SIGKILL) jobdone(jp);
      }
}

/*
 * job jp is done
 */

void
jobdone(register Cojob_t* jp)
{
      register int      n;
      char        buf[64];

      jp->pid = 0;
      jp->ref = 0;
      if (jp->cmd)
      {
            free(jp->cmd);
            jp->cmd = 0;
            state.jobwait--;
      }
      if (jp->shell == &state.wait) state.joblimit--;
      else if (jp->shell->running)
      {
            jp->shell->running--;
            if (!--state.running)
            {
                  state.real += cs.time - state.clock;
                  cswakeup(state.wakeup = 0L);
            }
      }
      if (jp->fd > 0)
      {
            jp->shell->user += jp->user;
            jp->shell->sys += jp->sys;
            state.con[jp->fd].info.user.running--;
            state.user += jp->user;
            state.sys += jp->sys;
            if (state.disable && (jp->sig == SIGKILL || jp->status > 128 && (jp->status % 128) == SIGKILL))
            {
                  jp->shell->mode |= SHELL_DISABLE;
                  jp->shell->update = cs.time + state.disable;
            }
#ifdef SIGCONT
            if (jp->sig && jp->sig != SIGCONT)
#else
            if (jp->sig)
#endif
                  jp->status = jp->sig + 128;
            n = sfsprintf(buf, sizeof(buf), "x %d %d %s %s\n", jp->rid, jp->status, fmtelapsed(jp->user, CO_QUANT), fmtelapsed(jp->sys, CO_QUANT));
            if (cswrite(state.con[jp->fd].info.user.fds[0], buf, n) != n)
                  drop(jp->fd);
      }
      if (state.joblimit) jobcheck(NiL);
}

Generated by  Doxygen 1.6.0   Back to index