Question:
Everywhere it is written that working with tasks is a cooperative process, that is, the task itself must complete correctly at the first request from the external code.
But what if someone fails?
For example, I Cancel
on the token and give it some time to complete, but the task does not complete, and the code should move on. Leave the task hanging?
I read that there is Thread.Abort
, but it is not recommended to use it.
An example with the removal of third-party code in a separate process was given.
I would like to see some other ways, at least a solution to the problem through the appDomain.
Answer:
Well, here's an example of implementation. I warn you right away, there will be a lot of code.
Let's take this unreliable function as a basis:
class EvilComputation
{
static Random random = new Random();
public static async Task<double> Compute(
int numberOfSeconds, double x, CancellationToken ct)
{
bool wellBehaved = random.Next(2) == 0;
var y = x * x;
var delay = TimeSpan.FromSeconds(numberOfSeconds);
await Task.Delay(delay, wellBehaved ? ct : CancellationToken.None);
return y;
}
}
You can see that the function is bad: depending on random conditions, it may not respond to cancellation.
What to do in this case? Let's take the function out into a separate process. This process can be killed without much harm to the original process.
In order to call a function in another process, you need to transfer data about the function call there. For communication, we use, for example, anonymous pipes (you can use essentially anything). I base the code on this example: How to: Use Anonymous Pipes for Local Interprocess Communication .
We will use standard binary formatting for data transfer, since we did not go through WCF. We need DTO objects that will be thrown between processes. They need to be used in two processes – the main and the auxiliary (let's call it a plugin ), so a separate assembly is required for DTO types.
Set up assembly OutProcCommonData
, we place in it the following classes:
namespace OutProcCommonData
{
[Serializable]
public class Command // общий класс-предок для посылаемой команды
{
}
[Serializable]
public class Evaluate : Command // команда на вычисление
{
public int NumberOfSecondsToProcess;
public double X;
}
[Serializable]
public class Cancel : Command // команда на отмену
{
}
}
Next, the returned result is:
namespace OutProcCommonData
{
[Serializable]
public class Response // общий класс-предок для возвращаемого результата
{
}
[Serializable]
public class Result : Response // готовый результат вычислений
{
public double Y;
}
[Serializable]
public class Error : Response // ошибка с текстом
{
public string Text;
}
[Serializable]
public class Cancelled : Response // подтверждение отмены
{
}
}
Next, our plugin. This is a separate console application (although if we don't want to see the console and debug output, we can make it non-console).
The communication protocol is as follows. The main program sends an Evaluate
, followed by a possibly Cancel
. The plugin returns Result
in case of a successful computation, Cancelled
in case of a cancellation signal received and a successful canceled computation, and Error
in case of an error (for example, violation of the communication protocol).
Here is the binding code:
class Plugin
{
static int Main(string[] args)
{
// нам должны быть переданы два аргумента: хендл входящего и исходящего пайпов
if (args.Length != 2)
{
Console.Error.WriteLine("Shouldn't be started directly");
return 1;
}
return new Plugin().Run(args[0], args[1]).Result;
}
BinaryFormatter serializer = new BinaryFormatter(); // для сериализации
async Task<int> Run(string hIn, string hOut)
{
Console.WriteLine("[Plugin] Running");
// открывем переданные пайпы
using (var inStream = new AnonymousPipeClientStream(PipeDirection.In, hIn))
using (var outStream = new AnonymousPipeClientStream(PipeDirection.Out, hOut))
{
try
{
var cts = new CancellationTokenSource(); // токен для отмены
Console.WriteLine("[Plugin] Reading args");
// пытаемся десериализовать аргументы
var args = SafeGet<OutProcCommonData.Evaluate>(inStream);
if (args == null)
{
Console.WriteLine("[Plugin] Didn't get args");
// отправляем ошибку, если не удалось
serializer.Serialize(
outStream,
new OutProcCommonData.Error() { Text = "Unrecognized input" });
// и выходим
return 3;
}
Console.WriteLine("[Plugin] Got args, start compute and waiting cancel");
// запускаем вычисление
var computeTask =
EvilComputation.Compute(
args.NumberOfSecondsToProcess,
args.X,
cts.Token);
// параллельно запускаем чтение возможной отмены
var waitForCancelTask = Task.Run(() =>
(OutProcCommonData.Cancel)serializer.Deserialize(inStream));
// дожидаемся одного из двух
var winner = await Task.WhenAny(computeTask, waitForCancelTask);
// если первой пришла отмена...
if (winner == waitForCancelTask)
{
Console.WriteLine("[Plugin] Got cancel, cancelling computation");
// просим вычисление завершиться
cts.Cancel();
}
// окончания вычисления всё равно нужно дождаться
Console.WriteLine("[Plugin] Awaiting computation");
// если вычисление отменится, здесь будет исключение
var result = await computeTask;
Console.WriteLine("[Plugin] Sending back result");
// отсылаем результат в пайп
serializer.Serialize(
outStream,
new OutProcCommonData.Result() { Y = result });
// нормальный выход
return 0;
}
catch (OperationCanceledException)
{
// мы успешно отменили задание, рапортуем
Console.WriteLine("[Plugin] Sending cancellation");
serializer.Serialize(
outStream,
new OutProcCommonData.Cancelled());
return 2;
}
catch (Exception ex)
{
// возникла непредвиденная ошибка, рапортуем
Console.WriteLine($"[Plugin] Sending error {ex.Message}");
serializer.Serialize(
outStream,
new OutProcCommonData.Error() { Text = ex.Message });
return 3;
}
}
}
// ну и вспомогательная функция, которая пытается читать данные из пайпа
T SafeGet<T>(Stream s) where T : class
{
try
{
return (T)serializer.Deserialize(s);
}
catch
{
return null;
}
}
}
I do not catch errors when writing to the pipe, add it yourself to taste.
Now, the main program. We will have it separately from the plugin (that is, we get three assemblies).
class Program
{
static void Main(string[] args) => new Program().Run().Wait();
async Task Run()
{
var cts = new CancellationTokenSource();
try
{
var y = await ComputeOutProc(2, cts.Token);
Console.WriteLine($"[Main] Result: {y}");
}
catch (TimeoutException)
{
Console.WriteLine("[Main] Timed out");
}
catch (OperationCanceledException)
{
Console.WriteLine("[Main] Cancelled");
}
}
const int SecondsToSend = 3;
const int TimeoutSeconds = 5;
const int CancelSeconds = 2;
BinaryFormatter serializer = new BinaryFormatter();
async Task<double> ComputeOutProc(double x, CancellationToken ct)
{
Process plugin = null;
bool pluginStarted = false;
try
{
// создаём исходящий и входящий пайпы
using (var commandStream = new AnonymousPipeServerStream(
PipeDirection.Out, HandleInheritability.Inheritable))
using (var responseStream = new AnonymousPipeServerStream(
PipeDirection.In, HandleInheritability.Inheritable))
{
Console.WriteLine("[Main] Starting plugin");
plugin = new Process()
{
StartInfo =
{
FileName = "OutProcPlugin.exe",
Arguments = commandStream.GetClientHandleAsString() + " " +
responseStream.GetClientHandleAsString(),
UseShellExecute = false
}
};
// запускаем плагин с параметрами
plugin.Start();
pluginStarted = true;
Console.WriteLine("[Main] Started plugin");
commandStream.DisposeLocalCopyOfClientHandle();
responseStream.DisposeLocalCopyOfClientHandle();
void Send(Command c)
{
serializer.Serialize(commandStream, c);
commandStream.Flush();
}
try
{
// отсылаем плагину команду на вычисление
Console.WriteLine("[Main] Sending evaluate request");
Send(new OutProcCommonData.Evaluate()
{
NumberOfSecondsToProcess = SecondsToSend,
X = x
});
Task<Response> responseTask;
bool readyInTime;
bool cancellationSent = false;
// внутри этого блока при отмене будем отсылать команду плагину
using (ct.Register(() =>
{
Send(new OutProcCommonData.Cancel());
Console.WriteLine("[Main] Requested cancellation");
cancellationSent = true;
}))
{
Console.WriteLine("[Main] Starting getting response");
// ожидаем получение ответа
responseTask = Task.Run(() =>
(Response)serializer.Deserialize(responseStream));
// или таймаута
var timeoutTask = Task.Delay(TimeSpan.FromSeconds(TimeoutSeconds));
var winner = await Task.WhenAny(responseTask, timeoutTask);
readyInTime = winner == responseTask;
}
// если наступил таймаут, просим процесс вежливо завершить вычисления
if (!readyInTime)
{
if (!cancellationSent)
{
Console.WriteLine("[Main] Not ready in time, sending cancel");
Send(new OutProcCommonData.Cancel());
}
else
{
Console.WriteLine("[Main] Not ready in time, cancel sent");
}
// и ждём ещё немного, ну или прихода ответа
var timeoutTask = Task.Delay(TimeSpan.FromSeconds(CancelSeconds));
await Task.WhenAny(responseTask, timeoutTask);
}
// если до сих пор ничего не пришло, плагин завис, убиваем его
if (!responseTask.IsCompleted)
{
Console.WriteLine("[Main] No response, killing plugin");
plugin.Kill(); // это завершит ожидание с исключением, по идее
// в ранних версиях .NET нужно было бы поймать
// это исключение
// и уходим с исключением-таймаутом
ct.ThrowIfCancellationRequested();
throw new TimeoutException();
}
// здесь мы уверены, что ожидание завершилось
Console.WriteLine("[Main] Obtaining response");
var response = await responseTask; // тут может быть брошено исключение
// если была затребована отмена, выходим
ct.ThrowIfCancellationRequested();
// проверяем тип результата:
switch (response)
{
case Result r:
// нормальный результат, возвращаем его
Console.WriteLine("[Main] Got result, returning");
return r.Y;
case Cancelled _:
// отмена не по ct = таймаут
Console.WriteLine("[Main] Got cancellation");
throw new TimeoutException();
case Error err:
// пришла ошибка, бросаем исключение
// лучше, конечно, определить собственный тип здесь
Console.WriteLine("[Main] Got error");
throw new Exception(err.Text);
default:
// сюда мы вообще не должны попасть, если плагин работает нормально
Console.WriteLine("[Main] Unexpected error");
throw new Exception("Unexpected response type");
}
}
catch (IOException e)
{
Console.WriteLine("[Main] IO error occured");
throw new Exception("IO Error", e);
}
}
}
finally
{
if (pluginStarted)
{
plugin.WaitForExit();
plugin.Close();
}
}
}
}
Mileage result:
[Main] Starting plugin
[Main] Started plugin
[Main] Sending evaluate request
[Main] Starting getting response
[Plugin] Running
[Plugin] Reading args
[Plugin] Got args, start compute and waiting cancel
[Plugin] Awaiting computation
[Plugin] Sending back result
[Main] Obtaining response
[Main] Got result, returning
[Main] Result: 4
If we change the SecondsToSend
constant to 10 so that there is a timeout, we get the following result of two runs:
For standard completion:
[Main] Starting plugin
[Main] Started plugin
[Main] Sending evaluate request
[Main] Starting getting response
[Plugin] Running
[Plugin] Reading args
[Plugin] Got args, start compute and waiting cancel
[Main] Not ready in time, sending cancel
[Plugin] Got cancel, cancelling computation
[Plugin] Awaiting computation
[Plugin] Sending cancellation
[Main] Obtaining response
[Main] Got cancellation
[Main] Timed out
To force termination:
[Main] Starting plugin
[Main] Started plugin
[Main] Sending evaluate request
[Main] Starting getting response
[Plugin] Running
[Plugin] Reading args
[Plugin] Got args, start compute and waiting cancel
[Main] Not ready in time, sending cancel
[Plugin] Got cancel, cancelling computation
[Plugin] Awaiting computation
[Main] No response, killing plugin
[Main] Timed out
If you add before
var y = await ComputeOutProc(2, cts.Token);
early cancellation:
cts.CancelAfter(TimeSpan.FromSeconds(1));
we get the following result: for standard completion
[Main] Starting plugin
[Main] Started plugin
[Main] Sending evaluate request
[Main] Starting getting response
[Plugin] Running
[Plugin] Reading args
[Plugin] Got args, start compute and waiting cancel
[Main] Requested cancellation
[Plugin] Got cancel, cancelling computation
[Plugin] Awaiting computation
[Plugin] Sending cancellation
[Main] Obtaining response
[Main] Cancelled
and for forced termination
[Main] Starting plugin
[Main] Started plugin
[Main] Sending evaluate request
[Main] Starting getting response
[Plugin] Running
[Plugin] Reading args
[Plugin] Got args, start compute and waiting cancel
[Main] Requested cancellation
[Plugin] Got cancel, cancelling computation
[Plugin] Awaiting computation
[Main] Not ready in time, cancel sent
[Main] No response, killing plugin
[Main] Cancelled
There is probably not enough error control in some places, so check to see if you need to catch any other exceptions.
You can add your own logic on top of this template. For example, similarly to a thread pool, you can create a pool of plugins and deliver tasks to the currently free plugin.