# encoding: ascii
# api: powershell
# title: Split-Job V
# description: The Split-Job function provides easy multithreading at the command line or in a script. It was created from a system administrator’s point of view and is compatible with PS v1. Supports importing functions, variables and snapins. For history and background please visit http://www.jansveld.net/powershell.
# version: 1.2
# type: function
# author: Arnoud Jansveld
# license: CC0
# function: Test-WebServer
# x-poshcode-id: 2621
# x-derived-from-id: 3679
# x-archived: 2015-05-06T18:28:29
# x-published: 2011-04-21T14:43:00
#
# This is Version 1.2 and only work in PowerShell V 2. It supports powershell_ise and will cancel pipelines if escape is pressed. It also includes various other minor improvements.
#
#requires -version 2.0
<#
################################################################################
## Run commands in multiple concurrent pipelines
## by Arnoud Jansveld - www.jansveld.net/powershell
##
## Basic "drop in" usage examples:
## - Functions that accept pipelined input:
## Without Split-Job:
## Get-Content hosts.txt | MyFunction | Export-Csv results.csv
## With Split-Job:
## Get-Content hosts.txt | Split-Job {MyFunction} | Export-Csv results.csv
## - Functions that do not accept pipelined input (use foreach):
## Without Split-Job:
## Get-Content hosts.txt |% { .\MyScript.ps1 -ComputerName $_ } | Export-Csv results.csv
## With Split-Job:
## Get-Content hosts.txt | Split-Job {%{ .\MyScript.ps1 -ComputerName $_ }} | Export-Csv results.csv
##
## Example with an imported function:
## function Test-WebServer ($ComputerName) {
## $WebRequest = [System.Net.WebRequest]::Create("http://$ComputerName")
## $WebRequest.GetResponse()
## }
## Get-Content hosts.txt | Split-Job {%{Test-WebServer $_ }} -Function Test-WebServer | Export-Csv results.csv
##
## Example with importing a module
## Get-Content Clusters.txt | Split-Job { % { Get-Cluster -Name $_ } } -InitializeScript { Import-Module FailoverClusters }
##
##
## Version History
## 1.2 Changes by Stephen Mills - stephenmills at hotmail dot com
## Only works with PowerShell V2
## Modified error output to use ErrorRecord parameter of Write-Error - catches Category Info then.
## Works correctly in powershell_ise. Previous version would let pipelines continue if ESC was pressed. If Escape pressed, then it will do an async cancel of the pipelines and exit.
## Add seconds remaining to progress bar
## Parameters Added and related functionality:
## InitializeScript - allows to have custom scripts to initilize ( Import-Module ...), parameter might be renamed Begin in the future.
## MaxDuration - Cancel all pending and in process items in queue if the number of seconds is reached before all input is done.
## ProgressInfo - Allows you to add additional text to progress bar
## NoProgress - Hide Progress Bar
## DisplayInterval - frequency to update Progress bar in milliseconds
## InputObject - not yet used, planned to be used in future to support start processing the queue before pipeline isn't finished yet
## Added example for importing a module.
## 1.0 First version posted on poshcode.org
## Additional runspace error checking and cleanup
## 0.93 Improve error handling: errors originating in the Scriptblock now
## have more meaningful output
## Show additional info in the progress bar (thanks Stephen Mills)
## Add SnapIn parameter: imports (registered) PowerShell snapins
## Add Function parameter: imports functions
## Add SplitJobRunSpace variable; allows scripts to test if they are
## running in a runspace
## 0.92 Add UseProfile switch: imports the PS profile
## Add Variable parameter: imports variables
## Add Alias parameter: imports aliases
## Restart pipeline if it stops due to an error
## Set the current path in each runspace to that of the calling process
## 0.91 Revert to v 0.8 input syntax for the script block
## Add error handling for empty input queue
## 0.9 Add logic to distinguish between scriptblocks and cmdlets or scripts:
## if a ScriptBlock is specified, a foreach {} wrapper is added
## 0.8 Adds a progress bar
## 0.7 Stop adding runspaces if the queue is already empty
## 0.6 First version. Inspired by Gaurhoth's New-TaskPool script
################################################################################
#>
function Split-Job
{
param (
[Parameter(Position=0, Mandatory=$true)]$Scriptblock,
[Parameter()][int]$MaxPipelines=10,
[Parameter()][switch]$UseProfile,
[Parameter()][string[]]$Variable,
[Parameter()][string[]]$Function = @(),
[Parameter()][string[]]$Alias = @(),
[Parameter()][string[]]$SnapIn,
[Parameter()][float]$MaxDuration = $( [Int]::MaxValue ),
[Parameter()][string]$ProgressInfo ='',
[Parameter()][int]$ProgressID = 0,
[Parameter()][switch]$NoProgress,
[Parameter()][int]$DisplayInterval = 300,
[Parameter()][scriptblock]$InitializeScript,
[Parameter(ValueFromPipeline=$true)][object[]]$InputObject
)
begin
{
$StartTime = Get-Date
#$DisplayTime = $StartTime.AddMilliseconds( - $DisplayInterval )
$ExitForced = $false
function Init ($InputQueue){
# Create the shared thread-safe queue and fill it with the input objects
$Queue = [Collections.Queue]::Synchronized([Collections.Queue]@($InputQueue))
$QueueLength = $Queue.Count
# Do not create more runspaces than input objects
if ($MaxPipelines -gt $QueueLength) {$MaxPipelines = $QueueLength}
# Create the script to be run by each runspace
$Script = "Set-Location '$PWD'; "
$Script += {
$SplitJobQueue = $($Input)
& {
trap {continue}
while ($SplitJobQueue.Count) {$SplitJobQueue.Dequeue()}
} |
}.ToString() + $Scriptblock
# Create an array to keep track of the set of pipelines
$Pipelines = New-Object System.Collections.ArrayList
# Collect the functions and aliases to import
$ImportItems = ($Function -replace '^','Function:') +
($Alias -replace '^','Alias:') |
Get-Item | select PSPath, Definition
$stopwatch = [System.Diagnostics.Stopwatch]::StartNew()
}
function Add-Pipeline {
# This creates a new runspace and starts an asynchronous pipeline with our script.
# It will automatically start processing objects from the shared queue.
$Runspace = [System.Management.Automation.Runspaces.RunspaceFactory]::CreateRunspace($Host)
$Runspace.Open()
if (!$?) { throw "Could not open runspace!" }
$Runspace.SessionStateProxy.SetVariable('SplitJobRunSpace', $True)
function CreatePipeline
{
param ($Data, $Scriptblock)
$Pipeline = $Runspace.CreatePipeline($Scriptblock)
if ($Data)
{
$Null = $Pipeline.Input.Write($Data, $True)
$Pipeline.Input.Close()
}
$Null = $Pipeline.Invoke()
$Pipeline.Dispose()
}
# Optionally import profile, variables, functions and aliases from the main runspace
if ($UseProfile)
{
CreatePipeline -Script "`$PROFILE = '$PROFILE'; . `$PROFILE"
}
if ($Variable)
{
foreach ($var in (Get-Variable $Variable -Scope 2))
{
trap {continue}
$Runspace.SessionStateProxy.SetVariable($var.Name, $var.Value)
}
}
if ($ImportItems)
{
CreatePipeline $ImportItems {
foreach ($item in $Input) {New-Item -Path $item.PSPath -Value $item.Definition}
}
}
if ($SnapIn)
{
CreatePipeline (Get-PSSnapin $Snapin -Registered) {$Input | Add-PSSnapin}
}
#Custom Initialization Script for startup of Pipeline - needs to be after other other items added.
if ($InitializeScript -ne $null)
{
CreatePipeline -Scriptblock $InitializeScript
}
$Pipeline = $Runspace.CreatePipeline($Script)
$Null = $Pipeline.Input.Write($Queue)
$Pipeline.Input.Close()
$Pipeline.InvokeAsync()
$Null = $Pipelines.Add($Pipeline)
}
function Remove-Pipeline ($Pipeline)
{
# Remove a pipeline and runspace when it is done
$Pipeline.RunSpace.CloseAsync()
#Removed Dispose so that Split-Job can be quickly aborted even if currently running something waiting for a timeout.
#Added call to [System.GC]::Collect() at end of script to free up what memory it can.
#$Pipeline.Dispose()
$Pipelines.Remove($Pipeline)
}
}
end
{
# Main
# Initialize the queue from the pipeline
. Init $Input
# Start the pipelines
try
{
while ($Pipelines.Count -lt $MaxPipelines -and $Queue.Count) {Add-Pipeline}
# Loop through the pipelines and pass their output to the pipeline until they are finished
while ($Pipelines.Count)
{
# Only update the progress bar once per $DisplayInterval
if (-not $NoProgress -and $Stopwatch.ElapsedMilliseconds -ge $DisplayInterval)
{
$Completed = $QueueLength - $Queue.Count - $Pipelines.count
$Stopwatch.Reset()
$Stopwatch.Start()
#$LastUpdate = $stopwatch.ElapsedMilliseconds
$PercentComplete = (100 - [Int]($Queue.Count)/$QueueLength*100)
$Duration = (Get-Date) - $StartTime
$DurationString = [timespan]::FromSeconds( [Math]::Floor($Duration.TotalSeconds)).ToString()
$ItemsPerSecond = $Completed / $Duration.TotalSeconds
$SecondsRemaining = [math]::Round(($QueueLength - $Completed)/ ( .{ if ($ItemsPerSecond -eq 0 ) { 0.001 } else { $ItemsPerSecond}}))
Write-Progress -Activity "** Split-Job ** *Press Esc to exit* Next item: $(trap {continue}; if ($Queue.Count) {$Queue.Peek()})" `
-status "Queues: $($Pipelines.Count) QueueLength: $($QueueLength) StartTime: $($StartTime) $($ProgressInfo)" `
-currentOperation "$( . { if ($ExitForced) { 'Aborting Job! ' }})Completed: $($Completed) Pending: $($QueueLength- ($QueueLength-($Queue.Count + $Pipelines.Count))) RunTime: $($DurationString) ItemsPerSecond: $([math]::round($ItemsPerSecond, 3))"`
-PercentComplete $PercentComplete `
-Id $ProgressID `
-SecondsRemaining $SecondsRemaining
}
foreach ($Pipeline in @($Pipelines))
{
if ( -not $Pipeline.Output.EndOfPipeline -or -not $Pipeline.Error.EndOfPipeline)
{
$Pipeline.Output.NonBlockingRead()
$Pipeline.Error.NonBlockingRead() | % { Write-Error -ErrorRecord $_ }
} else
{
# Pipeline has stopped; if there was an error show info and restart it
if ($Pipeline.PipelineStateInfo.State -eq 'Failed')
{
Write-Error $Pipeline.PipelineStateInfo.Reason
# Restart the runspace
if ($Queue.Count -lt $QueueLength) {Add-Pipeline}
}
Remove-Pipeline $Pipeline
}
if ( ((Get-Date) - $StartTime).TotalSeconds -ge $MaxDuration -and -not $ExitForced)
{
Write-Warning "Aborting job! The MaxDuration of $MaxDuration seconds has been reached. Inputs that have not been processed will be skipped."
$ExitForced=$true
}
if ($ExitForced) { $Pipeline.StopAsync(); Remove-Pipeline $Pipeline }
}
while ($Host.UI.RawUI.KeyAvailable)
{
if ($Host.ui.RawUI.ReadKey('NoEcho,IncludeKeyDown,IncludeKeyUp').VirtualKeyCode -eq 27 -and !$ExitForced)
{
$Queue.Clear();
Write-Warning 'Aborting job! Escape pressed! Inputs that have not been processed will be skipped.'
$ExitForced = $true;
#foreach ($Pipeline in @($Pipelines))
#{
# $Pipeline.StopAsync()
#}
}
}
if ($Pipelines.Count) {Start-Sleep -Milliseconds 50}
}
#Clear the Progress bar so other apps don't have to keep seeing it.
Write-Progress -Completed -Activity "`0" -Status "`0"
# Since reference to Dispose was removed. I added this to try to help with releasing resources as possible.
# This might be a bad idea, but I'm leaving it in for now. (Stephen Mills)
[GC]::Collect()
}
finally
{
foreach ($Pipeline in @($Pipelines))
{
if ( -not $Pipeline.Output.EndOfPipeline -or -not $Pipeline.Error.EndOfPipeline)
{
Write-Warning 'Pipeline still runinng. Stopping Async.'
$Pipeline.StopAsync()
Remove-Pipeline $Pipeline
}
}
}
}
}