#!/usr/bin/env isis-script % -*- mode: SLang; mode: fold -*- % cl_master: Master program to manage parallel computation of % {{{ % single-parameter confidence limits, using PVM, as % described in "Using the Parallel Virtual Machine % for Everyday Analysis," by Noble et al 2006 % (http://arxiv.org/abs/astro-ph/0510688) % % See also cl_slave. % % Authors: John C. Houck % Michael S. Noble % % Usage: ./cl_master [options] /path_on_all_machines/to/init.sl [param_list] % % User must provide init.sl script to initialize data & model parameters. % % Output files: % % best.$master_pid Best-fit parameter file % % limits.$master_pid History of single-parameter confidence % limit computations. When the master % exits normally, the final confidence % limits are the last N values listed. % % slaves_stdout.$master_pid Messages generated to stdout by slaves variable _version = 300; variable _version_string = "0.3.0"; % }}} % Setup (command line arg processing, module loading, etc) {{{ variable Empty = ""; variable Debug_PVM = 0; variable Num_Processes_Per_Host = 2; variable Tolerance = "10^-3"; % ISIS conf() default variable Level = "1"; % ISIS conf() default [90%] variable Valid_Levels = ["0", "1", "2", "68%", "90%", "99%"]; variable Setup_File = "init.sl"; variable Sub_Dir; variable Free_Param_List = NULL; variable Num_Limits_Finished = 0; variable Best_Statistic = NULL; variable Best_Params = NULL; variable Stdout_Fname = sprintf("slave_stdout.%d", getpid()); variable Best_Fname = sprintf ("best.%d", getpid()); variable Limit_Fname = sprintf ("limits.%d", getpid()); variable Limit_Fp = NULL; variable Limits = Assoc_Type[Any_Type, NULL]; variable Statistic; variable Num_Bins; variable Num_Free_Params; variable Num_Slaves_Spawned = 0; variable Start_Time; define instruct() % {{{ { () = fprintf(stderr, "\nUsage: %s [options] path_to_runtime_directory [param_list]\n\n"+ "With cl_slave, computes ISIS single-parameter confidence limits in\n"+ "parallel, using PVM, as described in 'Using the Parallel Virtual\n"+ "Machine for Everyday Analysis' by Noble et al 2006\n"+ "(http://arxiv.org/abs/astro-ph/0510688).\n"+ "\nOptions:\n"+ " -g enable S-Lang and PVM debugging aids\n"+ " -level=LEV_VALUE override the default 90%% confidence level\n"+ " LEV_VALUE may be 0, 1, 2 or 68%%, 90%%, 99%%\n"+ " -nph=INTEGER_VALUE override default num processes per host (2)\n"+ " -toler=REAL_VALUE override the default 10^-3 conf() tolerance\n"+ "\nVersion: %s\n", path_basename(__argv[0]), _version_string); exit(0); } % }}} define process_args() % {{{ { variable i = 1, argc = __argc; if (argc < 2) instruct; while (i < argc) { variable value = Empty; variable arg = strtok(__argv[i], "="); if (length(arg) > 1) value = arg[1]; arg = arg[0]; switch (arg) { case "-g" : _slangtrace = 1; _traceback = -1; Debug_PVM = 1; } { case "-level" : variable idx = wherefirst(value == Valid_Levels); if (idx == NULL) vmessage("Invalid level <%S>, using default.", value); else Level = Valid_Levels[idx mod 3]; } { case "-nph" : value = atoi(value); if (value < 1) vmessage("Invalid -nph value <%S>, must be >= 1.", value); else Num_Processes_Per_Host = value; } { case "-toler" : if (feqs(atof(value), 0.0)) vmessage("Invalid tolerance <%S>, using default.", value); else Tolerance = value; } { break; } i++; } argc--; % skip process name if (i > argc) instruct(); Sub_Dir = __argv[i]; if (NULL == stat_file (path_concat (Sub_Dir, Setup_File))) { vmessage ("*** %s not found", path_concat (Sub_Dir, Setup_File)); exit(1); } if (i < argc) Free_Param_List = __argv[i+1]; } % }}} process_args(); require ("pvm_ms"); require ("pvm_msgid"); % }}} define elapsed() % {{{ { variable runtime = _time() - Start_Time; % runtime, in seconds variable ndays = runtime / (24*60*60); variable hours = runtime mod (24*60*60); variable nhours = hours / 3600; variable mins = hours mod 3600; sprintf("%02d days %02d hours %02d mins %02d sec", ndays, nhours, mins / 60, mins mod 60); } % }}} define save_limit (fp, cl) %{{{ { () = fprintf (fp, "# Node: %s Elapsed Time: %s\n", cl.hostname, elapsed); () = fprintf (fp, "%20s %15.7e %15.7e %15.7e\n", cl.name, cl.value, cl.lo, cl.hi); () = fflush (fp); Limits[cl.name] = [cl.value, cl.lo, cl.hi]; } %}}} define save_fit_stat(fp, cl) %{{{ { () = fprintf (fp, "# Better: %s=%4.5f %s %s %s\n", Statistic, cl.statistic, cl.name, time, cl.hostname); () = fflush (fp); } %}}} public define isis_save_par_hook(dummy_filename) % {{{ { variable dof = Num_Bins - Num_Free_Params; variable reduced = ""; variable free_params = Free_Param_List; if (Best_Statistic != NULL and Statistic == "chisqr") reduced = sprintf("# Reduced chisqr = %S\n", Best_Statistic / dof); if (free_params == NULL) free_params = "all"; sprintf("# %s = %S\n%s"+ "# Confidence Level = %s (%s)\n"+ "# Tolerance = %s\n"+ "# Num Bins = %d\n"+ "# Free Params = %s\n"+ "# Num Free Params = %d\n"+ "# Degrees of Freedom = %d\n#\n"+ "# Each parameter limits line below is preceded by a timestamped\n"+ "# comment indicating where it was computed & when it finished.\n"+ "#\n", Statistic, Best_Statistic, reduced, Level, Valid_Levels[integer(Level)+3], Tolerance, Num_Bins, free_params, Num_Free_Params, dof); } % }}} define restart_slaves_with_new_params (params) %{{{ { Num_Limits_Finished = 0; set_params (params); save_par ( Best_Fname ); variable master_tid = pvm_mytid(); variable s = pvm_tasks (0); foreach (s.ti_tid) { variable tid = (); if (master_tid == tid) continue; pvm_psend (tid, USER_SLAVE_NEWTASK); pvm_send_obj (tid, USER_SLAVE_RESULT, params); } } %}}} define exit_all_slaves (sig) %{{{ { variable master_tid = pvm_mytid(); variable s = pvm_tasks (0); variable i = where (s.ti_tid != master_tid); if (sig) { if (any(i)) { () = fprintf(stderr, "Caught signal %S: killing slaves\n", sig); () = fflush(stderr); array_map (Void_Type, &pvm_kill, s.ti_tid[i]); } exit(sig); } array_map (Void_Type, &pvm_psend, s.ti_tid[i], USER_SLAVE_EXIT); } % Quitting master via signal should also bring down slaves variable DownSignals = [SIGINT, SIGTERM, SIGQUIT, SIGSEGV, SIGABRT]; array_map(Void_Type, &signal, DownSignals, &exit_all_slaves); %}}} define handle_user_message (msgid, tid) %{{{ { switch (msgid) { case USER_SLAVE_NEWTASK: variable cl = pvm_recv_obj(); if (orelse {Best_Statistic == NULL} {cl.statistic < Best_Statistic}) { Best_Statistic = cl.statistic; Best_Params = cl.params; save_fit_stat(Limit_Fp, cl); restart_slaves_with_new_params (Best_Params); } else { pvm_psend (tid, USER_SLAVE_NEWTASK); pvm_send_obj (tid, USER_SLAVE_RESULT, Best_Params); } } { case USER_SLAVE_RESULT: cl = pvm_recv_obj (); save_limit (Limit_Fp, cl); Num_Limits_Finished++; if (Num_Limits_Finished == Num_Free_Params) exit_all_slaves (0); } return 1; } %}}} define build_cmd (name) %{{{ { return ["cl_slave", name, Sub_Dir, Level, Tolerance]; } %}}} define get_free_param_names () %{{{ { variable p; if (Free_Param_List != NULL) p = get_params ( eval(Free_Param_List) ); else p = get_params(); variable names = array_struct_field (p, "name"); variable frozen = array_struct_field (p, "freeze"); return names[where(frozen == 0)]; } %}}} define handle_slave_spawn(tid, host, argv) % {{{ { Num_Slaves_Spawned++; vmessage ("%3d: %s %s spawned on %s %s\n", Num_Slaves_Spawned, argv[0], argv[1], host, strftime("%a %b %d %H:%M %Y")); } % }}} define start() % {{{ { () = chdir (Sub_Dir); Limit_Fp = fopen (Limit_Fname, "a"); if (Limit_Fp == NULL) { vmessage ("*** Error: failed opening %s", Limit_Fname); exit(1); } Tolerance = string(eval(Tolerance)); Start_Time = _time; variable s = sprintf("\n# Started: %S\n", ctime(Start_Time)); () = fprintf(Limit_Fp, s); () = printf(s); vmessage("\nBest fit parameters will be written to %S", Best_Fname); vmessage("Confidence limits history will be written to %S", Limit_Fname); vmessage("Slaves stdout will be captured to %S", Stdout_Fname); vmessage("Confidence levels will be computed to %S, with %S tolerance.\n", Valid_Levels[integer(Level)+3], Tolerance); pvm_ms_set_debug (Debug_PVM); pvm_ms_set_num_processes_per_host (Num_Processes_Per_Host); pvm_ms_set_message_callback (&handle_user_message); pvm_ms_set_slave_spawned_callback (&handle_slave_spawn); () = evalfile (Setup_File); variable info; () = eval_counts(&info); Statistic = strtok( get_fit_statistic(), ";")[0]; Num_Free_Params = info.num_variable_params; Num_Bins = info.num_bins; set_float_format("%4.6f"); () = fprintf(Limit_Fp, "%S", isis_save_par_hook(0)); } % }}} define finish(exit_status) %{{{ { variable file = path_concat (Sub_Dir, Stdout_Fname); variable fp = fopen (file, "w"); if (fp == NULL) { vmessage ("failed opening %s for writing", file); exit(1); } foreach (exit_status) { variable s = (); () = fprintf (fp, "[%s exit %d]==>\n%S\n\n", s.host, s.exit_status, s.stdout); } () = fclose (fp); () = fprintf(Limit_Fp, "# Finished: %S\n", time); variable runtime = sprintf("# Total Runtime: %s\n", elapsed()); () = fprintf(Limit_Fp, runtime); () = fprintf(stdout, runtime); () = fclose (Limit_Fp); foreach (Limits) using ("keys", "values") { variable values = (); variable name = (); set_par(name, values[0], 0, values[1], values[2]); } save_par ( Best_Fname ); exit (0); } %}}} define isis_main () % {{{ { process_args(); start(); variable slave_argvs; slave_argvs = array_map (Array_Type, &build_cmd, get_free_param_names()); variable exit_status = pvm_ms_run_master (slave_argvs); finish(exit_status); } % }}}