Wed Sep 23 10:22:33 2009: Request 49946 was acted upon. Transaction: Ticket created by john.el...@comcast.net Queue: Win32-Job Subject: Please add READ share to output logs created by Win32::Job->spawn Broken in: 0.03 Severity: Wishlist Owner: Nobody Requestors: john.el...@comcast.net Status: new Ticket <URL: https://rt.cpan.org/Ticket/Display.html?id=49946 >
Win32::Job can be used to create a process whose output to stdout and stderr is captured in log files designated in the options argument to "spawn". The log files are created with the FILE_SHARE_WRITE option, but not the FILE_SHARE_READ option. In Windows (XP), FILE_SHARE_WRITE apparently does not include FILE_SHARE_READ (i.e., does not allow shared read-only access - this came as a surprise to me too! See http://msdn.microsoft.com/en-us/library/aa363874(VS.85).aspx). So, while the process is running it is not possible for an external source to "type" or "tail" the log file. I suspect this is not intended behavior, but it is not certain so I report this as an enhancement request. This would be useful if, for example, the user could trigger the handler function passed to "watch" to abort on a user keypress or some other mechanism. In my installation, I was not able to watch the log to determine if my process was hung or it was in a "runaway" state. Simply adding FILE_SHARE_READ to the creation options would fix this problem. Specifically, at lines 464 and 488 of "Job.xs", please consider changing the content of the line from "FILE_SHARE_WRITE," to "FILE_SHARE_WRITE | FILE_SHARE_READ,". I did successfully try this change out in my installation, and the logs were indeed viewable (at least by 'type'). Thank you for your consideration of this enhancement request. Operating environment: Windows XP Professional SP2 Perl: ActiveState Perl 5.8.8.822 MSWin32 x86 multithread Package: Win32::Job (source code modification applied to 0.03).
#define _WIN32_WINNT 0x0500 #include <windows.h> #include "EXTERN.h" #include "perl.h" #include "XSUB.h" #ifdef __CYGWIN__ # define win32_get_osfhandle _get_osfhandle #endif #define NEWZ_CONST_INT 413 #define KILL_EXITCODE 293 #define AV_REAL_LEN(av) (av_len(av) + 1) /* In case we're building on VC98, define these macros so we can still run * the code on the appropriate platforms */ #ifndef CREATE_BREAKAWAY_FROM_JOB #define CREATE_BREAKAWAY_FROM_JOB 0x01000000 #endif #ifndef JOB_OBJECT_LIMIT_BREAKAWAY_OK #define JOB_OBJECT_LIMIT_BREAKAWAY_OK 0x00000800 #endif /* For non-threaded Perl */ #ifndef pTHX #define pTHX /* empty */ #define aTHX /* empty */ #define pTHX_ /* empty */ #define aTHX_ /* empty */ #endif /* This structure contains the HANDLE for the job object, plus an * array of pointers to PROCESS_INFORMATION structures (one for each * process spawn()ed). We remember these so we can call CloseHandle() * during DESTROY(), and so we can call ResumeThread() on each of them * during the watch() and run() calls. */ typedef struct { HANDLE hJob; /* the job */ AV* procs; /* processes in the job */ HV* info; /* process status info */ } job_t; typedef job_t *JOB_T; typedef PROCESS_INFORMATION *PROC_T; typedef struct { LARGE_INTEGER PerProcessUserTimeLimit; LARGE_INTEGER PerJobUserTimeLimit; DWORD LimitFlags; SIZE_T MinimumWorkingSetSize; SIZE_T MaximumWorkingSetSize; DWORD ActiveProcessLimit; #ifdef _WIN64 unsigned __int64 Affinity; #else DWORD Affinity; #endif DWORD PriorityClass; DWORD SchedulingClass; } MY_JOBOBJECT_BASIC_LIMIT_INFORMATION; typedef struct { ULONGLONG ReadOperationCount; ULONGLONG WriteOperationCount; ULONGLONG OtherOperationCount; ULONGLONG ReadTransferCount; ULONGLONG WriteTransferCount; ULONGLONG OtherTransferCount; } MY_IO_COUNTERS; typedef struct { MY_JOBOBJECT_BASIC_LIMIT_INFORMATION BasicLimitInformation; MY_IO_COUNTERS IoInfo; SIZE_T ProcessMemoryLimit; SIZE_T JobMemoryLimit; SIZE_T PeakProcessMemoryUsed; SIZE_T PeakJobMemoryUsed; } MY_JOBOBJECT_EXTENDED_LIMIT_INFORMATION; #define JobObjectExtendedLimitInformation 9 static HANDLE create_job_object() { MY_JOBOBJECT_EXTENDED_LIMIT_INFORMATION jobinfo; HANDLE job = CreateJobObject(NULL, NULL); memset(&jobinfo, 0, sizeof(jobinfo)); if (job && QueryInformationJobObject(job, JobObjectExtendedLimitInformation, &jobinfo, sizeof(jobinfo), NULL)) { jobinfo.BasicLimitInformation.LimitFlags |= JOB_OBJECT_LIMIT_BREAKAWAY_OK; SetInformationJobObject(job, JobObjectExtendedLimitInformation, &jobinfo, sizeof(jobinfo)); } return job; } /* Called to resume all the threads by watch() and run() */ static void resume_threads(pTHX_ AV *procs) { I32 i, imax = AV_REAL_LEN(procs); for (i = 0; i < imax; i++) { STRLEN l; SV* tmp = *av_fetch(procs, i, 0); PROC_T inf = (PROC_T)SvPV(tmp, l); ResumeThread(inf->hThread); } } static void free_threads(pTHX_ AV *procs) { I32 i, imax = AV_REAL_LEN(procs); for (i = 0; i < imax; i++) { STRLEN l; SV* tmp = *av_fetch(procs, i, 0); PROC_T inf = (PROC_T)SvPV(tmp, l); CloseHandle(inf->hThread); CloseHandle(inf->hProcess); } } /* Called to remember/close files created with CreateFile */ static SV* new_handle(pTHX_ HANDLE file) { SV* rv = newSViv(0); /* blank SV */ sv_setref_iv(rv, "Win32::Job::_handle", PTR2IV(file)); return rv; } static void get_status(pTHX_ JOB_T self, int wait) { I32 i, imax = AV_REAL_LEN(self->procs); if (imax) hv_clear(self->info); for (i = 0; i < imax; i++) { STRLEN l; SV *tmp = *av_fetch(self->procs, i, 0); PROC_T inf = (PROC_T)SvPV(tmp, l); HV *proc = newHV(); HV *htime = newHV(); SV *ent = newSVuv(inf->dwProcessId); DWORD ecode; FILETIME stime, etime, ktime, utime; double te, tk, tu; /* Wait for the process to finish terminating */ if (wait) WaitForSingleObject(inf->hProcess, INFINITE); /* Get information about the process (only care about user and * kernel times */ GetExitCodeProcess(inf->hProcess, &ecode); GetProcessTimes(inf->hProcess, &stime, &etime, &ktime, &utime); { ULARGE_INTEGER user, kernel, start, end, elapsed; kernel.LowPart = ktime.dwLowDateTime; kernel.HighPart = ktime.dwHighDateTime; user.LowPart = utime.dwLowDateTime; user.HighPart = utime.dwHighDateTime; start.LowPart = stime.dwLowDateTime; start.HighPart = stime.dwHighDateTime; end.LowPart = etime.dwLowDateTime; end.HighPart = etime.dwHighDateTime; if (!end.QuadPart) { /* process is not finished yet */ SYSTEMTIME now; GetSystemTime(&now); SystemTimeToFileTime(&now, &etime); end.LowPart = etime.dwLowDateTime; end.HighPart = etime.dwHighDateTime; } elapsed.QuadPart = end.QuadPart - start.QuadPart; /* We must cast to signed __int64 because MSVC++ can't * convert unsigned __int64 to double. It's probably okay; * if the process is running long enough to overflow a * signed 64-bit integer, it won't fit into a double * anyway. */ tk = ((__int64) kernel.QuadPart) / 10000000.0; tu = ((__int64) user.QuadPart) / 10000000.0; te = ((__int64)elapsed.QuadPart) / 10000000.0; } /* Create a tree structure like this: * <pid>: * exitcode: 123 * time: * user: 123 * kernel: 123 * elapsed: 123 */ hv_store(htime, "user", 4, newSVnv(tu), 0); hv_store(htime, "kernel", 6, newSVnv(tk), 0); hv_store(htime, "elapsed", 7, newSVnv(te), 0); hv_store(proc, "exitcode", 8, newSVuv(ecode), 0); hv_store(proc, "time", 4, newRV_noinc((SV*)htime), 0); hv_store_ent(self->info, ent, newRV_noinc((SV*)proc), 0); SvREFCNT_dec(ent); /* free */ } } /* Kills the threads running in the Job, collecting information about how long * each process has been running at the same time. */ static void kill_threads(pTHX_ JOB_T self) { TerminateJobObject(self->hJob, KILL_EXITCODE); get_status(aTHX_ self, 1); /* get status (and wait for the exitcode) */ free_threads(aTHX_ self->procs); av_clear(self->procs); } /* This function checks an SV* to see if it contains an IO* structure. This * code is taken from sv.c's sv_2io(). Unfortunately, *that* code throws * exceptions, and I just want to know if it will work or not, without having * to set up a new frame. */ static int /* bool */ sv_isio(pTHX_ SV *sv) { IO *io; GV *gv; STRLEN n_a; switch (SvTYPE(sv)) { case SVt_PVIO: io = (IO*)sv; return 1; case SVt_PVGV: gv = (GV*)sv; io = GvIO(gv); if (!io) return 0; return 1; default: if (!SvOK(sv)) return 0; if (SvROK(sv)) return sv_isio(aTHX_ SvRV(sv)); gv = gv_fetchpv(SvPV(sv,n_a), FALSE, SVt_PVIO); if (gv) return 1; else return 0; } return 0; } MODULE = Win32::Job PACKAGE = Win32::Job::_handle PROTOTYPES: DISABLE void DESTROY(SV* self) PREINIT: IV iv; HANDLE h; CODE: iv = SvIV(SvRV(self)); h = INT2PTR(HANDLE, iv); if (h) CloseHandle(h); MODULE = Win32::Job PACKAGE = Win32::Job PROTOTYPES: DISABLE JOB_T new(klass) SV* klass PREINIT: JOB_T job; CODE: Newz(NEWZ_CONST_INT, job, 1, job_t); job->hJob = create_job_object(); job->procs = newAV(); job->info = newHV(); RETVAL = job; if (!RETVAL) XSRETURN_UNDEF; OUTPUT: RETVAL void DESTROY(self) JOB_T self CODE: kill_threads(aTHX_ self); CloseHandle(self->hJob); SvREFCNT_dec(self->procs); SvREFCNT_dec(self->info); Safefree(self); void kill(self) JOB_T self CODE: kill_threads(aTHX_ self); IV spawn(self, svexe, args, ...) JOB_T self SV* svexe char* args PREINIT: char* exe; char* cwd = "."; /* cwd of the child process */ HV * opts; AV * files; STARTUPINFO st; PROC_T procinfo; BOOL ok; SV * ary_entry; DWORD createflags = (CREATE_SUSPENDED | CREATE_BREAKAWAY_FROM_JOB); char pbuf[MAX_PATH]; /* static buffer for 'exe' */ void *env = NULL; CODE: files = (AV*)sv_2mortal((SV*)newAV()); /* Store procinfo in an SV, to avoid worrying about memory */ ary_entry = NEWSV(NEWZ_CONST_INT, sizeof(PROCESS_INFORMATION)); SvPOK_on(ary_entry); SvCUR_set(ary_entry, sizeof(PROCESS_INFORMATION)); *(SvEND(ary_entry)) = 0; /* NULL-terminated */ procinfo = (PROC_T)SvPVX(ary_entry); /* Check whether 'exe' is NULL */ SvGETMAGIC(svexe); /* so SvOK() works */ if (SvOK(svexe)) exe = SvPV(svexe, PL_na); else exe = NULL; /* Set up a lame-oh STARTUPINFO structure */ memset(&st, 0, sizeof(STARTUPINFO)); st.cb = sizeof(STARTUPINFO); st.dwFlags = STARTF_USESTDHANDLES; st.hStdInput = GetStdHandle(STD_INPUT_HANDLE); st.hStdOutput = GetStdHandle(STD_OUTPUT_HANDLE); st.hStdError = GetStdHandle(STD_ERROR_HANDLE); st.lpDesktop = NULL; st.lpTitle = NULL; st.lpReserved = NULL; st.cbReserved2 = 0; st.lpReserved2 = NULL; /* Munge `exe' if there are no path separator in it */ if (exe && !strchr(exe, '/') && !strchr(exe, '\\')) { char *exts[] = { ".exe", ".com", ".bat", NULL }; char *ext = strchr(exe, '.'); /* is there an extension? */ char *path = PerlEnv_getenv("PATH"); char *curr = path; char *endp = strchr(curr, ';'); size_t len; Stat_t sbuf; while (endp) { len = endp - curr; strncpy(pbuf, curr, len); pbuf[len] = '\0'; if (pbuf[len-1] != '\\' && pbuf[len-1] != '/') strcat(pbuf, "/"); strcat(pbuf, exe); /* If the extension was given, check it */ if (ext) { if (PerlLIO_stat(pbuf, &sbuf) == 0) { exe = pbuf; goto exe_found; /* break */ } } /* otherwise try each of the three extensions */ else { int i; len = strlen(pbuf); for (i = 0; exts[i]; ++i) { strcpy(pbuf + len, exts[i]); /* check for file existence */ if (PerlLIO_stat(pbuf, &sbuf) == 0) { exe = pbuf; goto exe_found; /* break; break */ } } } /* select the next one */ curr = endp + 1; endp = strchr(curr, ';'); } } exe_found: /* Modify the `st' structure depending on what options are passed in * the `opts' hash */ if (items >= 4 && SvROK(ST(3)) && SvTYPE(SvRV(ST(3))) == SVt_PVHV) { opts = (HV*)SvRV(ST(3)); if (hv_exists(opts, "cwd", 3)) cwd = SvPV_nolen((SV*)*hv_fetch(opts, "cwd", 3, 0)); if (hv_exists(opts, "new_console", 11) && SvTRUE((SV*)*hv_fetch(opts, "new_console", 11, 0))) createflags |= CREATE_NEW_CONSOLE; if (hv_exists(opts, "window_attr", 11)) { char *tmp = SvPV_nolen(*hv_fetch(opts, "window_attr", 11, 0)); if (strEQ(tmp, "minimized")) { st.wShowWindow = SW_SHOWMINIMIZED; st.dwFlags |= STARTF_USESHOWWINDOW; } else if (strEQ(tmp, "maximized")) { st.wShowWindow = SW_SHOWMAXIMIZED; st.dwFlags |= STARTF_USESHOWWINDOW; } else if (strEQ(tmp, "hidden")) { st.wShowWindow = SW_HIDE; st.dwFlags |= STARTF_USESHOWWINDOW; } } if (hv_exists(opts, "new_group", 10) && SvTRUE((SV*)*hv_fetch(opts, "new_group", 10, 0))) createflags |= CREATE_NEW_PROCESS_GROUP; if (hv_exists(opts, "no_window", 9) && SvTRUE((SV*)*hv_fetch(opts, "no_window", 9, 0))) createflags |= CREATE_NO_WINDOW; if (hv_exists(opts, "stdin", 5)) { SV *tmp = (SV*)*hv_fetch(opts, "stdin", 5, 0); if (sv_isio(aTHX_ tmp)) { int fd = PerlIO_fileno(IoIFP(sv_2io(tmp))); st.hStdInput = (HANDLE)win32_get_osfhandle(fd); } else { HANDLE t = CreateFile( SvPV_nolen(tmp), GENERIC_READ, FILE_SHARE_READ, NULL, /* safe on W2K and XP */ OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, NULL ); if (t == INVALID_HANDLE_VALUE) XSRETURN_UNDEF; st.hStdInput = t; av_push(files, new_handle(aTHX_ st.hStdInput)); } SetHandleInformation(st.hStdInput, HANDLE_FLAG_INHERIT, HANDLE_FLAG_INHERIT); } if (hv_exists(opts, "stdout", 6)) { SV *tmp = (SV*)*hv_fetch(opts, "stdout", 6, 0); if (sv_isio(aTHX_ tmp)) { int fd = PerlIO_fileno(IoOFP(sv_2io(tmp))); st.hStdOutput = (HANDLE)win32_get_osfhandle(fd); } else { HANDLE t = CreateFile( SvPV_nolen(tmp), GENERIC_WRITE, FILE_SHARE_WRITE|FILE_SHARE_READ, NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL ); if (t == INVALID_HANDLE_VALUE) XSRETURN_UNDEF; st.hStdOutput = t; av_push(files, new_handle(aTHX_ st.hStdOutput)); } SetHandleInformation(st.hStdOutput, HANDLE_FLAG_INHERIT, HANDLE_FLAG_INHERIT); } if (hv_exists(opts, "stderr", 6)) { SV *tmp = (SV*)*hv_fetch(opts, "stderr", 6, 0); if (sv_isio(aTHX_ tmp)) { int fd = PerlIO_fileno(IoOFP(sv_2io(tmp))); st.hStdError = (HANDLE)win32_get_osfhandle(fd); } else { HANDLE t = CreateFile( SvPV_nolen(tmp), GENERIC_WRITE, FILE_SHARE_WRITE|FILE_SHARE_READ, NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL ); if (t == INVALID_HANDLE_VALUE) XSRETURN_UNDEF; st.hStdError = t; av_push(files, new_handle(aTHX_ st.hStdError)); } SetHandleInformation(st.hStdError, HANDLE_FLAG_INHERIT, HANDLE_FLAG_INHERIT); } } #ifdef PERL_IMPLICIT_SYS env = PerlEnv_get_childenv(); #endif ok = CreateProcess( exe, /* search PATH to find executable */ args, /* executable, and its arguments */ NULL, /* process security */ NULL, /* thread security */ TRUE, /* inherit handles */ createflags, /* creation flags */ env, /* inherit environment */ cwd, /* current directory */ &st, procinfo ); #ifdef PERL_IMPLICIT_SYS PerlEnv_free_childenv(env); #endif if (!ok) XSRETURN_UNDEF; /* Add the new process to the list of processes */ av_push(self->procs, ary_entry); /* Add the new process to the Job */ if (!AssignProcessToJobObject(self->hJob, procinfo->hProcess)) XSRETURN_UNDEF; /* Return the new PID */ RETVAL = procinfo->dwProcessId; OUTPUT: RETVAL int run(self, timeout, ...) JOB_T self double timeout PREINIT: BOOL which = 1; /* wait for ALL processes to complete */ HANDLE *hlist; DWORD ret, dwTimeout; I32 i, imax; CODE: if (items >= 3 && !SvTRUE(ST(2))) which = 0; /* wait for ANY process to complete */ imax = AV_REAL_LEN(self->procs); Newz(NEWZ_CONST_INT, hlist, imax, HANDLE); SAVEFREEPV(hlist); if (!timeout) dwTimeout = INFINITE; else dwTimeout = (DWORD) (timeout * 1000.0); for (i = 0; i < imax; i++) { STRLEN l; SV *tmp = *av_fetch(self->procs, i, 0); PROC_T inf = (PROC_T)SvPV(tmp, l); hlist[i] = inf->hProcess; } resume_threads(aTHX_ self->procs); ret = WaitForMultipleObjects(imax, hlist, which, dwTimeout); RETVAL = 0; if (ret >= WAIT_OBJECT_0 && ret <= WAIT_OBJECT_0 + imax) { RETVAL = 1; /* finished */ } kill_threads(aTHX_ self); OUTPUT: RETVAL int watch(self, callback, interval, ...) JOB_T self SV* callback double interval PREINIT: BOOL which = 1; /* wait for ALL processes to complete */ DWORD ret, dwInterval; HANDLE *hlist; I32 i, imax; IV stop; CODE: imax = AV_REAL_LEN(self->procs); Newz(NEWZ_CONST_INT, hlist, imax, HANDLE); SAVEFREEPV(hlist); /* free hlist on pseudo-scope exit */ if (items >= 4 && !SvTRUE(ST(3))) which = 0; /* wait for ANY process to complete */ if (!interval) XSRETURN_UNDEF; /* you suck, programmer! */ dwInterval = (DWORD)interval * 1000; for (i = 0; i < imax; i++) { STRLEN l; SV *tmp = *av_fetch(self->procs, i, 0); PROC_T inf = (PROC_T)SvPV(tmp, l); hlist[i] = inf->hProcess; } resume_threads(aTHX_ self->procs); RETVAL = 0; do { SV *sv_self = ST(0); /* copy of self as an SV */ stop = 0; ret = WaitForMultipleObjects(imax, hlist, which, dwInterval); /* Call user's function if we've timed out (else break) */ if (ret == WAIT_TIMEOUT) { I32 count; ENTER; SAVETMPS; PUSHMARK(SP); XPUSHs(sv_self); PUTBACK; count = call_sv(callback, G_SCALAR | G_EVAL); SPAGAIN; if (count != 1) croak("Watchdog callback did not returned >1 result."); stop = POPi; PUTBACK; FREETMPS; LEAVE; } else { stop = 1; RETVAL = 1; } } while (!stop); /* Kill the processes */ kill_threads(aTHX_ self); OUTPUT: RETVAL HV* status(self) JOB_T self CODE: get_status(aTHX_ self, 0); /* query w/o waiting for processes */ RETVAL = self->info; OUTPUT: RETVAL