During my last blog I discussed using background scripts. This week I will elaborate more on synchronizing background scripts and their data. If you haven’t read the previous blog don’t worry, you will be able to follow along. However, this blog does require at least a basic understanding of threading. The first part used a single background thread to track user activity within the application. A background thread is used to allow the user to continue to work in parallel to our script. Another way to use background scripts is to take a task and process it in parallel. Our script this week is an example of this.
Friday, February 23. 2018
LDC #73: Running in the Background Part 2 - Synchronizing Tasks
This design is usually referred to as parallelization, or the process of taking a task and splitting it into smaller pieces that can run in parallel. The script reads a directory tree to build a list of all the CIKs used in projects within the directory tree. This task can be performed in parallel since each file can be independently processed. To do this serially we would build a list a files, iterate over the list and process each file. To do this in parallel the process is essentially the same except the processing is done by a thread. Before we dive into the logistics of the design we should learn about a better way to share data between scripts.
In the previous blog we used a file to communicate with the background script but in the latest version of Legato we now have session variables. Session variables are stored globally in the application, so any script can access them. This means we can have our hooks store information for use by other hooks. For example, when launching the application we could have the user log into a CRM database and then store the credentials for other hooks like Validate or File Live/Test. These session variables can also be used for background threads which is what we will use them for now.
Now that we have global variables we need to discuss the design. We could have our main program spawn a thread for each file which would work for smaller amounts of files but would get unwieldy for larger amounts. So instead we can spawn a few worker threads, say one for each processor core, and then let them process the list. This means that each thread will need to know what parts of the list have been processed. We can still iterate over the list but many threads are going to be incrementing the iterator. If multiple threads incremented the value at the same time, a file would be skipped. Likewise if multiple threads read the value at the same time, the same file would be processed more than once. If you are new to programming with threads you may be thinking, why not set a global boolean that indicates a thread can increment the iterator? Let’s check out an example of this:
void increment() { int MyValue; int rc; rc = FALSE; while (rc == FALSE) { if (GetSessionString("OkayToEdit") == "true") { SetSessionValue("OkayToEdit", "false"); MyValue = GetSessionInteger("MyValue"); MyValue++; SetSessionValue("MyValue", MyValue); SetSessionValue("OkayToEdit", "true"); rc = TRUE; } Sleep(10); } } void main() { SetSessionValue("MyValue", 0); SetSessionValue("OkayToEdit", "true"); RunBackgroundScript(GetScriptFilename(), "increment"); RunBackgroundScript(GetScriptFilename(), "increment"); RunBackgroundScript(GetScriptFilename(), "increment"); RunBackgroundScript(GetScriptFilename(), "increment"); Sleep(1000); AddMessage("%d", GetSessionInteger("MyValue")); }
In the sample the main function spawns four copies of our increment function.
If increment is called and OkayToEdit is true, MyValue will be incremented. In a multi-threaded environment, this holds true as well. But consider if a thread gets interrupted after it checks the value of OkayToEdit but before it sets OkayToEdit to false. In that time period another thread could have checked the value of OkayToEdit and fallen into the if statement. Now two threads are going to increment the value. Even worse now that two threads are in here what if a thread gets interrupted after reading MyValue but before storing the result of the addition operation. Now increment was called twice but MyValue increased only once!
If you want to see this in action run the above script. Make sure you save the .ls file before running since the script is using GetScriptFilename. You will likely get a log message of “4”. If you run it a bunch of times you will get “4” almost every time. Now replace increment with this version:
void increment() { int MyValue; int rc; rc = FALSE; while (rc == FALSE) { if (GetSessionString("OkayToEdit") == "true") { Sleep(100); SetSessionValue("OkayToEdit", "false"); MyValue = GetSessionInteger("MyValue"); Sleep(100); MyValue++; SetSessionValue("MyValue", MyValue); SetSessionValue("OkayToEdit", "true"); rc = TRUE; } Sleep(10); } }
You can see the only difference are two calls to Sleep which pause the thread execution. It is safe to say that adding these lines just waits time but they also illustrate a point. Run it now, you’ll be lucky if you get more than “1”. This shows that even though the code looks correct it is not thread safe. So how do we fix it?
What we need is known as a Mutex (short for Mutual exclusion). We can use the mutex to make sections of our code mutually exclusive so that only one thread can run that code at a time. The Legato SDK already has all the pieces we need to build our own mutex. The crux of the issue is the during the time the script checks and sets a value another thread may also check and set that value. So we need an atomic operation, which is an operation that only one thread can do. For that, Legato has TestAndSetSessionValue. This function reads a session variable and if that session variable matches the specified value it sets the session variable to the other specified value in a single operation that allows only one thread. Now let’s look at our improved increment function.
void increment() { int MyValue; int rc; rc = FALSE; while (rc == FALSE) { rc = TestAndSetSessionValue("OkayToEdit", TASSV_MATCH, "true", "false"); if (rc == TRUE) { MyValue = GetSessionInteger("MyValue"); MyValue++; SetSessionValue("MyValue", MyValue); SetSessionValue("OkayToEdit", "true"); } Sleep(10); } }
We’ve replaced our check and then set with a single function. If you want to, try inserting Sleep between any lines. The script will still run properly! However, there is a catch with this kind of threading control. See how the increment function uses a while loop? If OkayToEdit never becomes the value “true”, that loop will never end. Our increment function is waiting for another thread (or even our main script) to set OkayToEdit. If that never happens we become stuck in an infinite loop. This is commonly referred to a deadlock. In this case we can prevent a deadlock through proper coding. OkayToEdit is set before the threads are created and each thread restores the value in all control paths. But this doesn’t protect us from outside influences. For a more robust system you may want to place safety limits on how long a thread will wait for a resource.
Now that we’ve covered the basics of mutexes, let’s dive into our script so we can see the value of parallelization.
// // GoFiler Legato Script - Build CIK List // -------------------------------------- // // Rev 02-21-2018 // // (c) 2018 Novaworks, LLC -- All rights reserved. // #define GROUPNAME "ReadCIKs" #define SVAR_MUTEXLAST "MutexLast" #define SVAR_LAST "LastProcessed" #define SVAR_MUTEXRESULTS "MutexResults" #define SVAR_RESULTS "Results" #define SVAR_MUTEXLOG "MutexLog" #define SVAR_LOG "Log" #define NUMBER_OF_THREADS 4 #define LOOKUP_CIKS true void file_processor (string target, string sfiles, int count); void process_file (string name); boolean is_project (string filetype); void log_message (string message); void add_cik (string cik); boolean lock_mutex (string name); void unlock_mutex (string name); /****************************************/ int main() { /* Main Function */ /****************************************/ string target; /* Target Directory */ string files[]; /* Files to Read */ string sfiles; /* Session Files */ string result; /* Result */ handle hThreads[]; /* Threads */ int ix, /* Index */ rc, /* Return Code */ count; /* Count */ /* */ /* ** Run Load */ /* * Ask Directory */ target = GetSetting("Options", "Last Folder"); /* Get the Folder from last time */ target = BrowseFolder("Load CIKs", target); /* Ask User */ if (target == "") { /* Cancel or Error? */ return ERROR_CANCEL; /* Leave with cancel */ } /* end cancel or error */ PutSetting("Options", "Last Folder", target); /* Store it */ ResetSessionData(); /* This is not safe generally */ /* but in this example is okay */ /* * Load File List */ files = EnumerateFiles(AddPaths(target, "*.gfp;*.xml"), /* Enumerate Files */ FOLDER_USE_PROGRESS | /* Use Progress Window */ FOLDER_LOAD_FOLDER_NAMES | /* Load Folder Names */ FOLDER_LOAD_RECURSE); /* Recursive */ count = ArrayGetAxisDepth(files); /* Get the Total */ /* * Save List as Session */ sfiles = ImplodeArray(files, ":"); /* Convert to String */ SetSessionValue(GROUPNAME, SVAR_MUTEXLAST, 0); /* Set Mutex */ SetSessionValue(GROUPNAME, SVAR_MUTEXRESULTS, 0); /* Set Mutex */ SetSessionValue(GROUPNAME, SVAR_MUTEXLOG, 0); /* Set Mutex */ SetSessionValue(GROUPNAME, SVAR_LAST, 0); /* Set Last Processed */ /* * Run Processor */ ResetElapsedTime(); /* Reset Time */ StatusBarMessage("Starting %d threads", NUMBER_OF_THREADS); /* Set Message */ if (NUMBER_OF_THREADS == 0) { /* No Threads */ file_processor(target, sfiles, count); /* Run function */ } /* end no threads */ else { /* Multiple Threads */ for (ix = 0; ix < NUMBER_OF_THREADS; ix++) { /* For each thread */ hThreads[ix] = RunBackgroundScript(GetScriptFilename(), /* Run Background Script */ "file_processor", /* name of the function */ target, sfiles, count); /* parameters */ } /* end for each thread */ } /* end multiple threads */ /* * Wait until done */ for (ix = 0; ix < NUMBER_OF_THREADS; ix++) { /* For each thread */ StatusBarMessage("Waiting for thread %d", ix); /* Set Message */ while (TRUE) { /* Loop */ rc = WaitForObject(hThreads[ix], 100); /* Wait for it */ if (IsNotError(rc)) { /* Worked? */ break; /* Stop */ } /* end worked */ Sleep(200); /* Sleep */ } /* end loop */ } /* end for each thread */ /* * Write Log */ result = GetSessionString(GROUPNAME, SVAR_RESULTS); /* Get Results */ StringToFile(result, AddPaths(GetScriptFolder(), "results.txt")); /* Write it */ result = GetSessionString(GROUPNAME, SVAR_LOG); /* Get Results */ if (result != "") { /* Has Data? */ StringToFile(result, AddPaths(GetScriptFolder(), "log.txt")); /* Write it */ } /* end has data */ MessageBox("Completed in %d ms", GetElapsedTime()); /* Tell User */ return ERROR_NONE; /* Exit w/ No error */ } /* end main */ /****************************************/ void file_processor(string target, string sfiles, int count) { /* File Processor */ /****************************************/ string files[]; /* Files to Read */ int current; /* Current Item */ /* */ /* ** File Processor */ /* * Set up */ files = ExplodeString(sfiles, ":"); /* Convert Back to Array */ current = 0; /* Clear count */ /* * Loop */ while (current < count) { /* While Okay */ /* o Get Last */ if (lock_mutex(SVAR_MUTEXLAST) == false) { /* Can't Lock it */ Sleep(10); /* Sleep */ continue; /* Try again */ } /* end can't lock it */ current = GetSessionInteger(GROUPNAME, SVAR_LAST); /* Get it */ StatusBarMessage("File %d", current); /* Set Message */ SetSessionValue(GROUPNAME, SVAR_LAST, current + 1); /* Tell next to use next index */ unlock_mutex(SVAR_MUTEXLAST); /* Allow others to get this */ /* o Safety */ if (current >= count) { /* Done now? */ break; /* Stop */ } /* end done now */ /* o Process Entry */ process_file(AddPaths(target, files[current])); /* Process it */ } /* end while loop */ } /* end function */ /****************************************/ void process_file(string name) { /* Get CIKs from File */ /****************************************/ handle hSGML; /* Mapped Data Handle */ string s1; /* message to log */ /* */ /* ** Get CIK from File */ /* * Bypass a ton of files */ if (is_project(GetFileTypeString(name)) == false) { /* Not one of our files? */ return; /* Leave */ } /* end not one of our files */ /* * Open File */ hSGML = SGMLCreate(name); /* Create SGML Object */ if (IsError(hSGML)) { /* Failed to Map? */ log_message("Cannot load file: " + name); /* Add Error Message */ return; /* Done */ } /* end failed to map */ /* * Read File */ s1 = SGMLNextElement(hSGML); /* Get First Element */ while (s1 != "") { /* While in SGML Code */ /* o Known Tag */ if ((FindInString(s1, "cik") >= 0) || /* CIK? */ (FindInString(s1, "CentralIndexKey") >= 0) || /* CIK? */ (FindInString(s1, "filerid") >= 0)) { /* FilerId? */ s1 = SGMLFindClosingElement(hSGML, SP_FCE_TRANSLATED_TEXT); /* Get CIK */ if (GetStringLength(s1) == 10) { /* Get Length */ add_cik(s1); /* Add it */ } /* end correct length */ } /* end cik or filerid */ /* o Next item */ s1 = SGMLNextElement(hSGML); /* Get Next Element */ } /* end parse loop */ CloseHandle(hSGML); /* Close handle to file */ } /* end function */ /****************************************/ boolean is_project(string filetype) { /* Checks for Project */ /****************************************/ switch (filetype){ /* Switch on the filetype */ case "FT_XML_SECTION_16": /* if section 16 */ return true; /* return true */ case "FT_XML_FORM_13H": /* if form 13h */ return true; /* return true */ case "FT_XML_FORM_C": /* if form c */ return true; /* return true */ case "FT_XML_FORM_13F": /* if 13f */ return true; /* return true */ case "FT_XML_FORM_D": /* if form d */ return true; /* return true */ case "FT_XML_FORM_MA": /* if form ma */ return true; /* return true */ case "FT_XML_FORM_N_MFP": /* if form nmfp */ return true; /* return true */ case "FT_XML_FORM_N_SAR": /* if nsar */ return true; /* return true */ case "FT_XML_EDGAR": /* if normal EDGAR XML */ return true; /* return true */ case "FT_XFDL": /* if old school XFDL */ return true; /* return true */ case "FT_GFP_3X_ELO": /* if GoFiler EDGARLinkOnline */ return true; /* return true */ case "FT_GFP_3X_13H": /* if 13H Project File */ return true; /* return true */ case "FT_GFP_3X_13F": /* if 13F Project File */ return true; /* return true */ case "FT_GFP_3X_MA": /* if MA Project File */ return true; /* return true */ case "FT_GOFILER_PROJECT_3X": /* if other GoFiler 3.x project file */ return true; /* return true */ case "FT_GOFILER_PROJECT": /* if old GoFiler Project File */ return true; /* return true */ } /* */ return false; /* return false */ } /* */ /****************************************/ void log_message(string msg) { /* Log Message */ /****************************************/ string log; /* Log */ /* */ /* ** Log Message */ /* * Get Mutex */ if (lock_mutex(SVAR_MUTEXLOG) == false) { /* Worked? */ return; /* Leave if this fails */ } /* end worked */ /* * Write Log */ log = GetSessionString(GROUPNAME, SVAR_LOG); /* Get Log */ log += msg + "\r\n"; /* Add Message */ SetSessionValue(GROUPNAME, SVAR_LOG, log); /* Save to Log */ /* * Clean up */ unlock_mutex(SVAR_MUTEXLOG); /* unlock it */ } /* end function */ /****************************************/ void add_cik(string cik) { /* Add CIK */ /****************************************/ string ciks; /* Log */ string company[]; /* Company */ /* */ /* ** Add CIK */ /* * Get Data */ if (LOOKUP_CIKS) { /* Lookup CIKs? */ company = EDGARLookupCIK(cik); /* Get CIK Data */ company["CompanyName"] = ": " + company["CompanyName"]; /* Add Prefix */ } /* end lookup CIKs */ else { /* No Lookup */ company["CompanyName"] = ""; /* Set to Empty */ } /* end no lookup */ /* * Get Mutex */ if (lock_mutex(SVAR_MUTEXRESULTS) == false) { /* Worked? */ return; /* Leave if this fails */ } /* end worked */ /* * Add CIK */ ciks = GetSessionString(GROUPNAME, SVAR_RESULTS); /* Get Log */ if (FindInString(ciks, cik + ":") < 0) { /* Not in list already? */ ciks += cik + company["CompanyName"] + "\r\n"; /* Add CIK */ SetSessionValue(GROUPNAME, SVAR_RESULTS, ciks); /* Save to Log */ } /* end not already in list */ /* * Clean up */ unlock_mutex(SVAR_MUTEXRESULTS); /* unlock it */ } /* end function */ /****************************************/ boolean lock_mutex(string mutex) { /* Lock Mutex */ /****************************************/ int rc; /* Return Code */ /* */ /* ** Lock Mutex */ /* * Block until okay */ while (TRUE) { /* Infinite Loop */ // This would be a good spot to put a session variable that could signal all threads to stop rc = TestAndSetSessionValue(GROUPNAME, mutex, TASSV_MATCH, 0, 1); /* Try to Obtain Mutex */ if (IsError(rc)) { /* Error? */ exit; /* This is bad */ } /* end error */ if (rc == TRUE) { /* Worked? */ return TRUE; /* Leave with okay */ } /* end worked */ } /* end infinite loop */ return FALSE; /* Shouldn't get here */ } /* end function */ /****************************************/ void unlock_mutex(string mutex) { /* Unlock Mutex */ /****************************************/ int rc; /* Return Code */ /* */ /* ** Unlock Mutex */ /* * Change it */ rc = TestAndSetSessionValue(GROUPNAME, mutex, TASSV_MATCH, 1, 0); /* Clear Mutex (only if already set) */ if (IsError(rc)) { /* Error? */ exit; /* This is bad */ } /* end error */ if (rc == FALSE) { /* Failed? */ return; /* This is bad (already unlocked) */ } /* end failed */ } /* end function */
Let’s start with the defines. Since we are using session variables, it is a good idea to create defines for their names. This way it is easy to change them and easy to verify they don’t conflict with other scripts. All of the session variable functions allow for an optional group name that we can use to further prevent conflicts with other scripts. So we define the group name and three session variables as well as three mutex variables to protect those session variables. We also have two defines to illustrate threading which are the number of threads to create (0+) and whether the script should lookup CIKs on the EDGAR system. The lookup is slow and highly variable because it requires communicating with a server across the Internet, so we’ll analyze the time it takes to process files with and without it to illustrate an optimal environment versus a production environment.
We also have our function defines.
#define GROUPNAME "ReadCIKs" #define SVAR_MUTEXLAST "MutexLast" #define SVAR_LAST "LastProcessed" #define SVAR_MUTEXRESULTS "MutexResults" #define SVAR_RESULTS "Results" #define SVAR_MUTEXLOG "MutexLog" #define SVAR_LOG "Log" #define NUMBER_OF_THREADS 4 #define LOOKUP_CIKS true void file_processor (string target, string sfiles, int count); void process_file (string name); boolean is_project (string filetype); void log_message (string message); void add_cik (string cik); boolean lock_mutex (string name); void unlock_mutex (string name);
The main function should get the directory from the user, load the file list, start the threads (if any), then wait for the threads to complete, and output the results. The “Ask Directory” section of the code uses BrowseFolder to get the directory. It also saves the directory for ease of use and runs the ResetSessionData function. For our example this is a helpful function to clear out all the data we’ve added but in a production script this function is dangerous as it deletes ALL session variables, including those of other scripts. We are better off creating a cleanup function that deletes only our variables but this is faster for the sample.
/* * Ask Directory */ target = GetSetting("Options", "Last Folder"); /* Get the Folder from last time */ target = BrowseFolder("Load CIKs", target); /* Ask User */ if (target == "") { /* Cancel or Error? */ return ERROR_CANCEL; /* Leave with cancel */ } /* end cancel or error */ PutSetting("Options", "Last Folder", target); /* Store it */ ResetSessionData(); /* This is not safe generally */
The “Load File List” and “Save List as Session” sections take the directory and convert it into an array of files. The array of files is then converted to a string using ImplodeArray. This is so we can pass the array to our threads. We also set our sessions variables to their default values using SetSessionValue.
/* * Load File List */ files = EnumerateFiles(AddPaths(target, "*.gfp;*.xml"), /* Enumerate Files */ FOLDER_USE_PROGRESS | /* Use Progress Window */ FOLDER_LOAD_FOLDER_NAMES | /* Load Folder Names */ FOLDER_LOAD_RECURSE); /* Recursive */ count = ArrayGetAxisDepth(files); /* Get the Total */ /* * Save List as Session */ sfiles = ImplodeArray(files, ":"); /* Convert to String */ SetSessionValue(GROUPNAME, SVAR_MUTEXLAST, 0); /* Set Mutex */ SetSessionValue(GROUPNAME, SVAR_MUTEXRESULTS, 0); /* Set Mutex */ SetSessionValue(GROUPNAME, SVAR_MUTEXLOG, 0); /* Set Mutex */ SetSessionValue(GROUPNAME, SVAR_LAST, 0); /* Set Last Processed */
The “Run Processor” section uses the NUMBER_OF_THREADS define to either start that many threads or, if it is 0, run the processing function directly. We will store the thread handles in an array so we can wait for them to finish later. The file_processor function takes our directory and file list as parameters. Since the RunBackgrounScript allows us to pass that information we can load the directory list only once. We also use ResetElapsedTime to reset the script run time to better track how long the file processing takes. If we don’t reset the time it would include the time it took the user to select a directory above.
/* * Run Processor */ ResetElapsedTime(); /* Reset Time */ StatusBarMessage("Starting %d threads", NUMBER_OF_THREADS); /* Set Message */ if (NUMBER_OF_THREADS == 0) { /* No Threads */ file_processor(target, sfiles, count); /* Run function */ } /* end no threads */ else { /* Multiple Threads */ for (ix = 0; ix < NUMBER_OF_THREADS; ix++) { /* For each thread */ hThreads[ix] = RunBackgroundScript(GetScriptFilename(), /* Run Background Script */ "file_processor", /* name of the function */ target, sfiles, count); /* parameters */ } /* end for each thread */ } /* end multiple threads */
The “Wait until done” and “Write Log” sections wait for the threads to finish (if we started any) and then takes the session data the file_processor function has created and outputs it to a file. We also use the GetElapsedTime function to see how long the processing took.
/* * Write Log */ result = GetSessionString(GROUPNAME, SVAR_RESULTS); /* Get Results */ StringToFile(result, AddPaths(GetScriptFolder(), "results.txt")); /* Write it */ result = GetSessionString(GROUPNAME, SVAR_LOG); /* Get Results */ if (result != "") { /* Has Data? */ StringToFile(result, AddPaths(GetScriptFolder(), "log.txt")); /* Write it */ } /* end has data */ MessageBox("Completed in %d ms", GetElapsedTime()); /* Tell User */
That covers the main function. Following the program flow the next function to discuss would be file_processor. This function iterates over the list of files and processes them. But in order to understand this function we first need to look at our mutex functions. Based on the discussion above you will rightly guess that these functions will rely on the TestAndSetSessionValue function within a loop. In fact the code is almost as simple as that. For locking we check if our mutex value is false and if so set it to true and leave. If not we loop and try again. For unlock we don’t need the loop since we don’t need to wait for the value to able to change (since we should have been the thread that locked it) but if the value isn’t what we think it should be that is an error. Pretty straight forward. With these two functions we can protect any of our session variables by using our mutex session variables.
/****************************************/ boolean lock_mutex(string mutex) { /* Lock Mutex */ /****************************************/ int rc; /* Return Code */ /* */ /* ** Lock Mutex */ /* * Block until okay */ while (TRUE) { /* Infinite Loop */ // This would be a good spot to put a session variable that could signal all threads to stop rc = TestAndSetSessionValue(GROUPNAME, mutex, TASSV_MATCH, 0, 1); /* Try to Obtain Mutex */ if (IsError(rc)) { /* Error? */ exit; /* This is bad */ } /* end error */ if (rc == TRUE) { /* Worked? */ return TRUE; /* Leave with okay */ } /* end worked */ } /* end infinite loop */ return FALSE; /* Shouldn't get here */ } /* end function */ /****************************************/ void unlock_mutex(string mutex) { /* Unlock Mutex */ /****************************************/ int rc; /* Return Code */ /* */ /* ** Unlock Mutex */ /* * Change it */ rc = TestAndSetSessionValue(GROUPNAME, mutex, TASSV_MATCH, 1, 0); /* Clear Mutex (only if already set) */ if (IsError(rc)) { /* Error? */ exit; /* This is bad */ } /* end error */ if (rc == FALSE) { /* Failed? */ return; /* This is bad (already unlocked) */ } /* end failed */ } /* end function */
Now that we have our mutex functions we can go back to the file_processor. We will start off by converting our string of files back into an array using ExplodeString. Then we will fall into our main processing loop. We try and lock our iterator mutex using lock_mutex. If we can’t lock it we Sleep and try again. If we can lock it we are certain we are the only thread accessing this session variable. So we get the value of our current position and then set the next value back for the next thread to read. We then unlock_mutex to allow another thread to read the updated position. We then make sure we are still within the list of files and if so we run process_file. When you unlock and lock your mutexes is important. As a rule of thumb unlocking them as soon as you no longer need the resource is best. Also, when dealing with multiple mutexes the order in which you lock them is very important as doing things in different orders may result in deadlocks.
/****************************************/ void file_processor(string target, string sfiles, int count) { /* File Processor */ /****************************************/ string files[]; /* Files to Read */ int current; /* Current Item */ /* */ /* ** File Processor */ /* * Set up */ files = ExplodeString(sfiles, ":"); /* Convert Back to Array */ current = 0; /* Clear count */ /* * Loop */ while (current < count) { /* While Okay */ /* o Get Last */ if (lock_mutex(SVAR_MUTEXLAST) == false) { /* Can't Lock it */ Sleep(10); /* Sleep */ continue; /* Try again */ } /* end can't lock it */ current = GetSessionInteger(GROUPNAME, SVAR_LAST); /* Get it */ StatusBarMessage("File %d", current); /* Set Message */ SetSessionValue(GROUPNAME, SVAR_LAST, current + 1); /* Tell next to use next index */ unlock_mutex(SVAR_MUTEXLAST); /* Allow others to get this */ /* o Safety */ if (current >= count) { /* Done now? */ break; /* Stop */ } /* end done now */ /* o Process Entry */ process_file(AddPaths(target, files[current])); /* Process it */ } /* end while loop */ } /* end function */
Logically the next function to talk about is the next function that we called, process_file. This function is doing all the heavy lifting and is being run in parallel. You can talk the framework above and replace process_file with almost any kind of work and process a list of data in parallel. Let’s dig into the function.
First we will check if the file is worth processing. Since there are many different types of XML files we only want to look at ones that are EDGAR projects. For this we use a variant of Steve’s is_obfuscatable function from LDC #55. Basically the function returns true if the XML file is a project. If it is not a project we are done and we leave. If it is we need to read the CIK. For this we will use the SGML Object to parse the XML. If the SGML Object fails to load the file it is most likely due to a file error (access denied, etc.) so we log the error using our log_message function. Once we have the parser we loop looking for tags that are commonly used to store CIKs. I’m sure this list isn’t exhaustive but it could easily be adapted for more. If we find one of these tags we ask the parser for the contents of the tag using SGMLFindClosingElement and pass that to our add_cik function. We don’t stop the loop because there may be more than one CIK in a project. We parse until end of file and the close the handle and leave. Processing for this file is complete!
/****************************************/ void process_file(string name) { /* Get CIKs from File */ /****************************************/ handle hSGML; /* Mapped Data Handle */ string s1; /* message to log */ /* */ /* ** Get CIK from File */ /* * Bypass a ton of files */ if (is_project(GetFileTypeString(name)) == false) { /* Not one of our files? */ return; /* Leave */ } /* end not one of our files */ /* * Open File */ hSGML = SGMLCreate(name); /* Create SGML Object */ if (IsError(hSGML)) { /* Failed to Map? */ log_message("Cannot load file: " + name); /* Add Error Message */ return; /* Done */ } /* end failed to map */ /* * Read File */ s1 = SGMLNextElement(hSGML); /* Get First Element */ while (s1 != "") { /* While in SGML Code */ /* o Known Tag */ if ((FindInString(s1, "cik") >= 0) || /* CIK? */ (FindInString(s1, "CentralIndexKey") >= 0) || /* CIK? */ (FindInString(s1, "filerid") >= 0)) { /* FilerId? */ s1 = SGMLFindClosingElement(hSGML, SP_FCE_TRANSLATED_TEXT); /* Get CIK */ if (GetStringLength(s1) == 10) { /* Get Length */ add_cik(s1); /* Add it */ } /* end correct length */ } /* end cik or filerid */ /* o Next item */ s1 = SGMLNextElement(hSGML); /* Get Next Element */ } /* end parse loop */ CloseHandle(hSGML); /* Close handle to file */ } /* end function */
You may be thinking, “But what do add_cik and log_message do?” They store the results in a session variable. If only it were so simple. Remember we are using multiple threads so in order to save data safely to these lists we need to use a mutex. Luckily, we have the lock_mutex and unlock_mutex functions. Both these functions are about the same so I will discuss them together. The add_cik function has additional processing to lookup a CIK which has been covered in other blogs. After all the prep work we lock the mutex for our session variable (SVAR_MUTEXLOG or SVAR_MUTEXRESULTS). If we can lock it, we read the most up to date version of the session variable, add our data to it and save it back. After that is done we unlock the mutex. The add_cik function also checks to see if a CIK is already in the list but the process is the same.
/****************************************/ void log_message(string msg) { /* Log Message */ /****************************************/ string log; /* Log */ /* */ /* ** Log Message */ /* * Get Mutex */ if (lock_mutex(SVAR_MUTEXLOG) == false) { /* Worked? */ return; /* Leave if this fails */ } /* end worked */ /* * Write Log */ log = GetSessionString(GROUPNAME, SVAR_LOG); /* Get Log */ log += msg + "\r\n"; /* Add Message */ SetSessionValue(GROUPNAME, SVAR_LOG, log); /* Save to Log */ /* * Clean up */ unlock_mutex(SVAR_MUTEXLOG); /* unlock it */ } /* end function */ /****************************************/ void add_cik(string cik) { /* Add CIK */ /****************************************/ string ciks; /* Log */ string company[]; /* Company */ /* */ /* ** Add CIK */ /* * Get Data */ if (LOOKUP_CIKS) { /* Lookup CIKs? */ company = EDGARLookupCIK(cik); /* Get CIK Data */ company["CompanyName"] = ": " + company["CompanyName"]; /* Add Prefix */ } /* end lookup CIKs */ else { /* No Lookup */ company["CompanyName"] = ""; /* Set to Empty */ } /* end no lookup */ /* * Get Mutex */ if (lock_mutex(SVAR_MUTEXRESULTS) == false) { /* Worked? */ return; /* Leave if this fails */ } /* end worked */ /* * Add CIK */ ciks = GetSessionString(GROUPNAME, SVAR_RESULTS); /* Get Log */ if (FindInString(ciks, cik + ":") < 0) { /* Not in list already? */ ciks += cik + company["CompanyName"] + "\r\n"; /* Add CIK */ SetSessionValue(GROUPNAME, SVAR_RESULTS, ciks); /* Save to Log */ } /* end not already in list */ /* * Clean up */ unlock_mutex(SVAR_MUTEXRESULTS); /* unlock it */ } /* end function */
Now that we’ve reviewed the script, let’s take a look at what using threading and parallelization gained. I did a few trial runs changing the value of NUMBER_OF_THREADS and the results speak for themselves.
In a sample directory of just over 600 project files, I ran the program 5 times with no threading, one thread and four threads, and then compared the results. Here are the results for running the program without looking up CIKs on the EDGAR System:
And here’s how long each run took when it did look up the CIKs on EDGAR:
With four threads and no EDGAR lookup, the script run time decreased 70%! With lookups it was a 50% decrease in run time. Pretty impressive what background threads can accomplish. Try improving the performance of your next script using multi-threading and parallelization.
David Theis has been developing software for Windows operating systems for over fifteen years. He has a Bachelor of Sciences in Computer Science from the Rochester Institute of Technology and co-founded Novaworks in 2006. He is the Vice President of Development and is one of the primary developers of GoFiler, a financial reporting software package designed to create and file EDGAR XML, HTML, and XBRL documents to the U.S. Securities and Exchange Commission. |
Additional Resources
Legato Script Developers LinkedIn Group
Primer: An Introduction to Legato