StaTaskScheduler y bombeo de mensajes de subprocesos de STA

Resuelto avo asked hace 10 años • 2 respuestas

TL;DR: Un punto muerto dentro de una tarea ejecutada por StaTaskScheduler.Versión larga:

Estoy usando ParallelExtensionsExtrasStaTaskScheduler de Parallel Team para alojar algunos objetos STA COM heredados proporcionados por un tercero. La descripción de los detalles de implementación dice lo siguiente:StaTaskScheduler

La buena noticia es que la implementación de TPL puede ejecutarse en subprocesos MTA o STA y tiene en cuenta diferencias relevantes en torno a las API subyacentes como WaitHandle.WaitAll (que solo admite subprocesos MTA cuando el método cuenta con múltiples identificadores de espera).

Pensé que eso significaría que las partes de bloqueo de TPL usarían una API de espera que envía mensajes, comoCoWaitForMultipleHandles , para evitar situaciones de bloqueo cuando se llama a un hilo de STA.

En mi situación, creo que está sucediendo lo siguiente: el objeto STA COM A en proceso realiza una llamada al objeto B fuera de proceso, luego espera una devolución de llamada de B como parte de la llamada saliente.

En forma simplificada:

var result = await Task.Factory.StartNew(() =>
{
    // in-proc object A
    var a = new A(); 
    // out-of-proc object B
    var b = new B(); 
    // A calls B and B calls back A during the Method call
    return a.Method(b);     
}, CancellationToken.None, TaskCreationOptions.None, staTaskScheduler);

El problema es que a.Method(b)nunca regresa. Hasta donde puedo decir, esto sucede porque una espera de bloqueo en algún lugar interno BlockingCollection<Task>no envía mensajes, por lo que mi suposición sobre la declaración citada probablemente sea incorrecta.

EDITADO El mismo código funciona cuando se ejecuta en el hilo de la interfaz de usuario de la aplicación WinForms de prueba (es decir, proporcionando TaskScheduler.FromCurrentSynchronizationContext()en lugar staTaskSchedulerdeTask.Factory.StartNew ).

¿Cuál es la forma correcta de resolver esto? ¿Debería implementar un contexto de sincronización personalizado, que enviaría mensajes explícitamente con CoWaitForMultipleHandlese instalarlo en cada hilo de STA iniciado porStaTaskScheduler ?

Si es así, ¿la implementación subyacente llamará a BlockingCollectionmi SynchronizationContext.Waitmétodo? ¿Puedo usarlo SynchronizationContext.WaitHelperpara implementar SynchronizationContext.Wait?


EDITADO con un código que muestra que un subproceso STA administrado no se activa cuando se realiza una espera de bloqueo. El código es una aplicación de consola completa lista para copiar/pegar/ejecutar:

using System;
using System.Collections.Concurrent;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleTestApp
{
    class Program
    {
        // start and run an STA thread
        static void RunStaThread(bool pump)
        {
            // test a blocking wait with BlockingCollection.Take
            var tasks = new BlockingCollection<Task>();

            var thread = new Thread(() => 
            {
                // Create a simple Win32 window 
                var hwndStatic = NativeMethods.CreateWindowEx(0, "Static", String.Empty, NativeMethods.WS_POPUP,
                    0, 0, 0, 0, IntPtr.Zero, IntPtr.Zero, IntPtr.Zero, IntPtr.Zero);

                // subclass it with a custom WndProc
                IntPtr prevWndProc = IntPtr.Zero;

                var newWndProc = new NativeMethods.WndProc((hwnd, msg, wParam, lParam) =>
                {
                    if (msg == NativeMethods.WM_TEST)
                        Console.WriteLine("WM_TEST processed");
                    return NativeMethods.CallWindowProc(prevWndProc, hwnd, msg, wParam, lParam);
                });

                prevWndProc = NativeMethods.SetWindowLong(hwndStatic, NativeMethods.GWL_WNDPROC, newWndProc);
                if (prevWndProc == IntPtr.Zero)
                    throw new ApplicationException();

                // post a test WM_TEST message to it
                NativeMethods.PostMessage(hwndStatic, NativeMethods.WM_TEST, IntPtr.Zero, IntPtr.Zero);

                // BlockingCollection blocks without pumping, NativeMethods.WM_TEST never arrives
                try { var task = tasks.Take(); }
                catch (Exception e) { Console.WriteLine(e.Message); }

                if (pump)
                {
                    // NativeMethods.WM_TEST will arrive, because Win32 MessageBox pumps
                    Console.WriteLine("Now start pumping...");
                    NativeMethods.MessageBox(IntPtr.Zero, "Pumping messages, press OK to stop...", String.Empty, 0);
                }
            });

            thread.SetApartmentState(ApartmentState.STA);
            thread.Start();

            Thread.Sleep(2000);

            // this causes the STA thread to end
            tasks.CompleteAdding(); 

            thread.Join();
        }

        static void Main(string[] args)
        {
            Console.WriteLine("Testing without pumping...");
            RunStaThread(false);

            Console.WriteLine("\nTest with pumping...");
            RunStaThread(true);

            Console.WriteLine("Press Enter to exit");
            Console.ReadLine();
        }
    }

    // Interop
    static class NativeMethods
    {
        [DllImport("user32")]
        public static extern IntPtr SetWindowLong(IntPtr hwnd, int nIndex, WndProc newProc);

        [DllImport("user32")]
        public static extern IntPtr CallWindowProc(IntPtr lpPrevWndFunc, IntPtr hwnd, int msg, int wParam, int lParam);

        [DllImport("user32.dll")]
        public static extern IntPtr CreateWindowEx(int dwExStyle, string lpClassName, string lpWindowName, int dwStyle, int x, int y, int nWidth, int nHeight, IntPtr hWndParent, IntPtr hMenu, IntPtr hInstance, IntPtr lpParam);

        [DllImport("user32.dll")]
        public static extern bool PostMessage(IntPtr hwnd, uint msg, IntPtr wParam, IntPtr lParam);

        [DllImport("user32.dll")]
        public static extern int MessageBox(IntPtr hwnd, string text, String caption, int options);

        public delegate IntPtr WndProc(IntPtr hwnd, int msg, int wParam, int lParam);

        public const int GWL_WNDPROC = -4;
        public const int WS_POPUP = unchecked((int)0x80000000);
        public const int WM_USER = 0x0400;

        public const int WM_TEST = WM_USER + 1;
    }
}

Esto produce la salida:

Probando sin bombear...
El argumento de la colección está vacío y se ha marcado como completo con respecto a las adiciones.

Prueba con bombeo...
El argumento de la colección está vacío y se ha marcado como completo con respecto a las adiciones.
Ahora empieza a bombear...
WM_TEST procesado
Presione Entrar para salir
avo avatar Jan 19 '14 08:01 avo
Aceptado

Según tengo entendido, su problema está utilizando StaTaskSchedulersolo para organizar el apartamento COM STA clásico para sus objetos COM heredados. No está ejecutando un bucle de mensajes principal de WinForms o WPF en el hilo de STA de StaTaskScheduler. Es decir, no estás usando nada como Application.Runo dentro Application.DoEventsde Dispatcher.PushFrameese hilo. Corríjame si esta es una suposición errónea.

Por sí solo, StaTaskScheduler no instala ningún contexto de sincronización en los subprocesos de STA que crea. Por lo tanto, confía en el CLR para enviar mensajes por usted. Solo encontré una confirmación implícita de que CLR bombea en subprocesos STA, en Apartments y Pumping in the CLR de Chris Brumme:

Sigo diciendo que el bloqueo administrado realizará "algo de bombeo" cuando se le solicite en un subproceso de STA. ¿No sería fantástico saber exactamente qué se bombeará? Desafortunadamente, bombear es un arte negro que está más allá de la comprensión mortal. En Win2000 y versiones posteriores, simplemente delegamos al servicio CoWaitForMultipleHandles de OLE32 .

Esto indica que CLR utiliza CoWaitForMultipleHandlesinternamente para subprocesos de STA. Además, los documentos de MSDN para COWAIT_DISPATCH_WINDOW_MESSAGESla bandera mencionan esto :

... en STA se envía solo un pequeño conjunto de mensajes en mayúsculas y minúsculas especiales.

Investigué un poco sobre eso , pero no pude extraer el WM_TESTcódigo de muestra CoWaitForMultipleHandles, lo discutimos en los comentarios a su pregunta. Según tengo entendido, el pequeño conjunto de mensajes en mayúsculas especiales mencionado anteriormente está realmente limitado a algunos mensajes específicos del separador COM y no incluye ningún mensaje normal de propósito general como el suyo WM_TEST.

Entonces, para responder a tu pregunta:

... ¿Debería implementar un contexto de sincronización personalizado, que enviaría mensajes explícitamente con CoWaitForMultipleHandles e instalarlo en cada hilo de STA iniciado por StaTaskScheduler?

Sí, creo que crear un contexto de sincronización personalizado y anularlo SynchronizationContext.Waites de hecho la solución correcta.

Sin embargo, debes evitar el uso CoWaitForMultipleHandlesy utilizar MsgWaitForMultipleObjectsExen su lugar . Si MsgWaitForMultipleObjectsExindica que hay un mensaje pendiente en la cola, debes enviarlo manualmente con PeekMessage(PM_REMOVE)y DispatchMessage. Luego deberás seguir esperando los identificadores, todos dentro de la misma SynchronizationContext.Waitllamada.

Tenga en cuenta que hay una diferencia sutil pero importante entre MsgWaitForMultipleObjectsExy MsgWaitForMultipleObjects. Este último no regresa y sigue bloqueándose si ya hay un mensaje visto en la cola (por ejemplo, con PeekMessage(PM_NOREMOVE)o GetQueueStatus), pero no eliminado. Eso no es bueno para el bombeo, porque sus objetos COM podrían estar usando algo así como PeekMessagepara inspeccionar la cola de mensajes. Más tarde, eso podría provocar MsgWaitForMultipleObjectsun bloqueo cuando no se esperaba.

OTOH, MsgWaitForMultipleObjectsExcon MWMO_INPUTAVAILABLEbandera, no tiene tal defecto y regresaría en este caso.

Hace un tiempo creé una versión personalizada de StaTaskScheduler( disponible aquí comoThreadAffinityTaskScheduler ) en un intento de resolver un problema diferente : mantener un grupo de subprocesos con afinidad de subprocesos para awaitcontinuaciones posteriores. La afinidad de subprocesos es vital si utiliza objetos COM de STA en varios archivos awaits. El original StaTaskSchedulermuestra este comportamiento sólo cuando su grupo está limitado a 1 subproceso.

Así que seguí adelante y experimenté un poco más con tu WM_TESTcaso. Originalmente, instalé una instancia de la SynchronizationContextclase estándar en el hilo de STA. El WM_TESTmensaje no fue difundido, como se esperaba.

Luego lo anulé SynchronizationContext.Waitpara reenviarlo a SynchronizationContext.WaitHelper. Lo llamaron, pero aún así no bombeó.

Finalmente, implementé un bucle de bombeo de mensajes con todas las funciones, aquí está la parte principal:

// the core loop
var msg = new NativeMethods.MSG();
while (true)
{
    // MsgWaitForMultipleObjectsEx with MWMO_INPUTAVAILABLE returns,
    // even if there's a message already seen but not removed in the message queue
    nativeResult = NativeMethods.MsgWaitForMultipleObjectsEx(
        count, waitHandles,
        (uint)remainingTimeout,
        QS_MASK,
        NativeMethods.MWMO_INPUTAVAILABLE);

    if (IsNativeWaitSuccessful(count, nativeResult, out managedResult) || WaitHandle.WaitTimeout == managedResult)
        return managedResult;

    // there is a message, pump and dispatch it
    if (NativeMethods.PeekMessage(out msg, IntPtr.Zero, 0, 0, NativeMethods.PM_REMOVE))
    {
        NativeMethods.TranslateMessage(ref msg);
        NativeMethods.DispatchMessage(ref msg);
    }
    if (hasTimedOut())
        return WaitHandle.WaitTimeout;
}

Esto funciona, WM_TESTse bombea. A continuación se muestra una versión adaptada de su prueba:

public static async Task RunAsync()
{
    using (var staThread = new Noseratio.ThreadAffinity.ThreadWithAffinityContext(staThread: true, pumpMessages: true))
    {
        Console.WriteLine("Initial thread #" + Thread.CurrentThread.ManagedThreadId);
        await staThread.Run(async () =>
        {
            Console.WriteLine("On STA thread #" + Thread.CurrentThread.ManagedThreadId);
            // create a simple Win32 window
            IntPtr hwnd = CreateTestWindow();

            // Post some WM_TEST messages
            Console.WriteLine("Post some WM_TEST messages...");
            NativeMethods.PostMessage(hwnd, NativeMethods.WM_TEST, new IntPtr(1), IntPtr.Zero);
            NativeMethods.PostMessage(hwnd, NativeMethods.WM_TEST, new IntPtr(2), IntPtr.Zero);
            NativeMethods.PostMessage(hwnd, NativeMethods.WM_TEST, new IntPtr(3), IntPtr.Zero);
            Console.WriteLine("Press Enter to continue...");
            await ReadLineAsync();

            Console.WriteLine("After await, thread #" + Thread.CurrentThread.ManagedThreadId);
            Console.WriteLine("Pending messages in the queue: " + (NativeMethods.GetQueueStatus(0x1FF) >> 16 != 0));

            Console.WriteLine("Exiting STA thread #" + Thread.CurrentThread.ManagedThreadId);
        }, CancellationToken.None);
    }
    Console.WriteLine("Current thread #" + Thread.CurrentThread.ManagedThreadId);
}

La salida :

Hilo inicial #9
En el hilo STA #10
Publicar algunos mensajes WM_TEST...
Presione Entrar para continuar...
WM_TEST procesado: 1
WM_TEST procesado: 2
WM_TEST procesado: 3

Después de esperar, hilo #10
Mensajes pendientes en la cola: Falso
Saliendo del hilo STA #10
Tema actual #12
presiona cualquier tecla para salir

Tenga en cuenta que esta implementación admite tanto la afinidad del subproceso (permanece en el subproceso n.º 10 después de await) como el bombeo de mensajes. El código fuente completo contiene partes reutilizables ( ThreadAffinityTaskSchedulery ThreadWithAffinityContext) y está disponible aquí como aplicación de consola independiente . No ha sido probado exhaustivamente, así que úselo bajo su propia responsabilidad.

noseratio avatar Jan 27 '2014 02:01 noseratio

El tema del bombeo de subprocesos de STA es amplio y muy pocos programadores se divierten resolviendo puntos muertos. El artículo fundamental al respecto fue escrito por Chris Brumme, un director inteligente que trabajó en .NET. Lo encontrarás en esta entrada del blog . Desafortunadamente, es bastante breve en detalles, no va más allá de señalar que el CLR bombea un poco pero sin ningún detalle sobre las reglas exactas.

El código del que habla, agregado en .NET 2.0, está presente en una función CLR interna denominada MsgWaitHelper(). El código fuente de .NET 2.0 está disponible a través de la distribución SSCLI20. Muy completo, pero la fuente de MsgWaitHelper() no está incluida. Bastante inusual. Descompilarlo es más bien una causa perdida, es muy grande.

Lo único que podemos sacar de la publicación de su blog es el peligro de reingreso . Bombear un subproceso STA es peligroso por su capacidad de enviar mensajes de Windows y ejecutar código arbitrario cuando su programa no está en el estado correcto para permitir que dicho código se ejecute. Algo que la mayoría de los programadores de VB6 saben cuando usó DoEvents() para obtener un bucle modal en su código para dejar de congelar la interfaz de usuario. Escribí un post sobre sus peligros más típicos. MsgWaitHelper() hace exactamente este tipo de bombeo por sí mismo, sin embargo, es muy selectivo sobre exactamente qué tipo de código permite ejecutar.

Puede obtener una idea de lo que hace dentro de su programa de prueba ejecutando el programa sin un depurador adjunto y luego adjuntando un depurador no administrado. Lo verás bloqueándose en NtWaitForMultipleObjects(). Di un paso más y establecí un punto de interrupción en PeekMessageW() para obtener este seguimiento de pila:

user32.dll!PeekMessageW()   Unknown
combase.dll!CCliModalLoop::MyPeekMessage(tagMSG * pMsg, HWND__ * hwnd, unsigned int min, unsigned int max, unsigned short wFlag) Line 2305  C++
combase.dll!CCliModalLoop::PeekRPCAndDDEMessage() Line 2008 C++
combase.dll!CCliModalLoop::FindMessage(unsigned long dwStatus) Line 2087    C++
combase.dll!CCliModalLoop::HandleWakeForMsg() Line 1707 C++
combase.dll!CCliModalLoop::BlockFn(void * * ahEvent, unsigned long cEvents, unsigned long * lpdwSignaled) Line 1645 C++
combase.dll!ClassicSTAThreadWaitForHandles(unsigned long dwFlags, unsigned long dwTimeout, unsigned long cHandles, void * * pHandles, unsigned long * pdwIndex) Line 46 C++
combase.dll!CoWaitForMultipleHandles(unsigned long dwFlags, unsigned long dwTimeout, unsigned long cHandles, void * * pHandles, unsigned long * lpdwindex) Line 120 C++
clr.dll!MsgWaitHelper(int,void * *,int,unsigned long,int)   Unknown
clr.dll!Thread::DoAppropriateWaitWorker(int,void * *,int,unsigned long,enum WaitMode)   Unknown
clr.dll!Thread::DoAppropriateWait(int,void * *,int,unsigned long,enum WaitMode,struct PendingSync *)    Unknown
clr.dll!CLREventBase::WaitEx(unsigned long,enum WaitMode,struct PendingSync *)  Unknown
clr.dll!CLREventBase::Wait(unsigned long,int,struct PendingSync *)  Unknown
clr.dll!Thread::Block(int,struct PendingSync *) Unknown
clr.dll!SyncBlock::Wait(int,int)    Unknown
clr.dll!ObjectNative::WaitTimeout(bool,int,class Object *)  Unknown

Tenga en cuenta que registré este seguimiento de pila en Windows 8.1; se verá bastante diferente en versiones anteriores de Windows. El bucle modal COM ha sido modificado en gran medida en Windows 8; también es muy importante para los programas WinRT. No sé mucho al respecto, pero parece tener otro modelo de subprocesos STA llamado ASTA que realiza un tipo de bombeo más restrictivo, consagrado en el CoWaitForMultipleObjects() agregado.

ObjectNative::WaitTimeout() es donde SemaphoreSlim.Wait() dentro del método BlockingCollection.Take() comienza a ejecutar el código CLR. Lo ve avanzando a través de los niveles del código CLR interno para llegar a la mítica función MsgWaitHelper(), y luego cambia al infame bucle del despachador modal COM.

La señal de murciélago de que está haciendo el tipo de bombeo "incorrecto" en su programa es la llamada al método CliModalLoop::PeekRPCAndDDEMessage(). En otras palabras, solo se considera el tipo de mensajes de interoperabilidad que se publican en una ventana interna específica que envía las llamadas COM que cruzan los límites de un apartamento. No enviará los mensajes que están en la cola de mensajes para su propia ventana.

Este es un comportamiento comprensible, Windows sólo puede estar absolutamente seguro de que el reingreso no matará su programa cuando pueda ver que su hilo de interfaz de usuario está inactivo . Está inactivo cuando bombea el bucle de mensajes; una llamada a PeekMessage() o GetMessage() indica ese estado. El problema es que no te animas. Violó el contrato principal de un hilo de STA, debe bombear el bucle de mensajes. Esperar que el bucle modal COM haga el bombeo por usted es, por lo tanto, una esperanza inútil.

De hecho, puedes solucionar este problema, aunque no te recomiendo que lo hagas. El CLR dejará que la propia aplicación realice la espera mediante un objeto SynchronizationContext.Current construido correctamente. Puede crear uno derivando su propia clase y anulando el método Wait(). Llame al método SetWaitNotificationRequired() para convencer al CLR de que debería dejarlo en sus manos. Una versión incompleta que demuestra el enfoque:

class MySynchronizationProvider : System.Threading.SynchronizationContext {
    public MySynchronizationProvider() {
        base.SetWaitNotificationRequired();
    }
    public override int Wait(IntPtr[] waitHandles, bool waitAll, int millisecondsTimeout) {
        for (; ; ) {
            int result = MsgWaitForMultipleObjects(waitHandles.Length, waitHandles, waitAll, millisecondsTimeout, 8);
            if (result == waitHandles.Length) System.Windows.Forms.Application.DoEvents();
            else return result;
        }
    }
    [DllImport("user32.dll")]
    private static extern int MsgWaitForMultipleObjects(int cnt, IntPtr[] waitHandles, bool waitAll,
        int millisecondTimeout, int mask);        
}

E instálalo al comienzo de tu hilo:

    System.ComponentModel.AsyncOperationManager.SynchronizationContext =
        new MySynchronizationProvider();

Ahora verás que se envía tu mensaje WM_TEST. Es la llamada a Application.DoEvents() la que lo envió. Podría haberlo ocultado usando PeekMessage + DispatchMessage, pero eso ocultaría el peligro de este código; es mejor no colocar DoEvents() debajo de la mesa. Realmente estás jugando un juego de reentrada muy peligroso. No utilices este código.

En pocas palabras, la única esperanza de usar StaThreadScheduler correctamente es cuando se usa en código que ya implementó el contrato STA y bombea como debería hacerlo un subproceso STA. En realidad, estaba pensado como una curita para el código antiguo en el que no es necesario darse el lujo de controlar el estado del hilo. Como cualquier código que comenzó en un programa VB6 o complemento de Office. Experimentando un poco con ello, no creo que realmente pueda funcionar. También es notable que su necesidad debería eliminarse por completo con la disponibilidad de asych/await.

Hans Passant avatar Jan 26 '2014 11:01 Hans Passant