Смекни!
smekni.com

MSSQL 2005 (Yukon) – работа с очередями и асинхронная обработка данных (стр. 6 из 7)

Клиент 1:

...SqlCommand cmd = new SqlCommand( "select id, [tm], Data from dbo.AsyncTest where id = @id")cmd.Parameters.AddWithValue("@id", 2);...

Клиент 2:

...SqlCommand cmd = new SqlCommand( "select id, [tm], Data from dbo.AsyncTest where id = @id")cmd.Parameters.AddWithValue("@id", 3);...

На эти два запроса ядром сервера будет сформирован один запрос на отслеживание изменений, но первый клиент получит извещение только в том случае, если поменялась запись с ID = 2, а второй – если поменялась запись с ID = 3

СОВЕТСписок подписчиков, ожидающих извещения, можно просмотреть с помощью специального системного представления sys.dm_qn_subscriptions.

SqlNotificationRequest

Механизм извещений этого класса работает по такому же принципу, что и SqlDependency, но на чуть более низком уровне, со всеми вытекающими из этого последствиями. То есть за несколько большую гибкость необходимо расплатиться чуть большей работой при реализации. Однако основное ограничение по-прежнему присутствует, так как серверный механизм определения изменений и помещения сообщения в очередь все тот же. Главное отличие от SqlDependency состоит в том, что в данном случае работа сервера ограничивается помещением сообщения в очередь, а уж о том, чтобы забрать сообщение из этой очереди клиент должен позаботиться сам. Соответственно, никакие серверные .Net-процедуры не используются, а значит можно обойтись и без разрешения выполнения .Net-процедур. Также нет необходимости обеспечивать доступность клиента с сервера, но расплачиваться за это приходится постоянным соединением клиента с сервером, что при большом числе клиентов может быть достаточно накладным.

В качестве сервиса ServiseBroker-а, используемого для доставки сообщения, можно использовать как стандартный сервис, предназначенный для извещения об изменениях в результатах запроса: [http://schemas.microsoft.com/SQL/Notifications/QueryNotificationService], так и создать свой собственный:

-- Сначала создаем очередь--CREATE QUEUE NotifyTestQueueGO-- Затем на базе этой очереди создаем сервис-- Используется тот же самый контракт, специально -- предназначенный для извещений об изменениях в результатах запроса.CREATE SERVICE NotifyTestService ON QUEUE NotifyTestQueue ([http://schemas.microsoft.com/SQL/Notifications/PostQueryNotification])GO

Сам же код клиента, который делает примерно то же самое, что и предыдущий пример, используя очередь и сервис, созданные выше, может выглядеть следующим образом:

using System;using System.Data;using System.Data.Sql;using System.Data.SqlClient;namespace Rsdn.AsyncTest{ public class NotificationTest { private string _connectionString = "Data Source=localhost\ctpapril;Initial Catalog=cavy;" + "Integrated Security=SSPI;Pooling=false"; public void GetData() { using (SqlConnection connection = new SqlConnection(_connectionString)) { connection.Open(); SqlCommand cmd = new SqlCommand( "SELECT ID, [Time], Data FROM dbo.AsyncTest", connection); // Инициализируемобъект SqlNotificationRequest // SqlNotificationRequest notifyRequest = new SqlNotificationRequest(); notifyRequest.UserData = "Any User Data"; notifyRequest.Options = "service=NotifyTestService"; notifyRequest.Timeout = 600;// И передаем его на сервер вместе с SqlCommand// cmd.Notification = notifyRequest; SqlDataReader rdr = cmd.ExecuteReader(); while (rdr.Read()) Console.WriteLine(rdr[0] + "\t" + rdr[2] + "\t" + rdr[1]);} // Вызов метода, который будет караулить очередь на предмет // появления извещений об изменении данных в результате запроса// WaitForChanges(); } public void WaitForChanges() { using (SqlConnection connection = new SqlConnection(_connectionString)) { connection.Open(); SqlCommand cmd = new SqlCommand( "WAITFOR (Receive convert(xml, message_body) from NotifyTestQueue)",connection); // Timeout выставляем в бесконечность, или, по крайней мере, // больше, чем Timeout notifyRequest'а, чтобы клиент гарантированно // дождался изменений. cmd.CommandTimeout = 0; // В этом месте поток ожидает поступления извещения об изменении.// object o = cmd.ExecuteScalar(); Console.WriteLine(o);} } }}

Разберем этот код подробнее.

Использование метода GetData опять-таки, как и в предыдущем примере, мало отличается от стандартного использования объекта SqlCommand, создается лишь дополнительный объект SqlNotificationRequest, который явно инициализируется всеми данными, необходимыми для запуска механизма извещений. Затем этот объект передается на сервер вместе с SqlCommand.

Здесь стоит обратить внимание на свойство Options. Очевидно, через него можно задать дополнительные опции извещения. В примере указан используемый сервис, однако там же можно задать используемую базу, экземпляр Service Broker-а и еще ряд параметров.

На этом работа метода GetData() завершается и дело переходит к методу WaitForChanges(). Этот метод тоже не отличается излишней сложностью. На сервер отправляется запрос WAITFOR(RECEIVE …), ожидающий поступления сообщений из указанной очереди или таймаута. Ключевая особенность - в процессе ожидания соединение с БД открыто. Как можно заметить на примере, это не обязательно должно быть то же самое подключение, что использовалось при отправке запроса, но, тем не менее, оно должно быть, то есть клиент обязан сам получить сообщение.

Само сообщение представляет собой XML-строку, это требование стандартного контракта [http://schemas.microsoft.com/SQL/Notifications/PostQueryNotification], используемого в данном механизме. На самом деле сообщение из предыдущего примера представляет собой такой же XML, но класс SqlDependency разбирает его и предоставляет доступ к данным уже через свойства некоего объекта, здесь же разбор XML придется делать самостоятельно. Сам XML представляет собой примерно следующую конструкцию:

<qn:QueryNotification xmlns:qn="http://schemas.microsoft.com/SQL/Notifications/QueryNotification" id="142" type="change" source="data" info="update" database_id="9" sid="0x0105000000000005150000001BB462FDDA7A03005D16C93DEB030000"> <qn:Message>Any User Data</qn:Message> </qn:QueryNotification>

Здесь ID – это номер подписки в списке sys.dm_qn_subscriptions, type, source и info - та же информация, что и в случае SqlDependency, database_id, очевидно, идентификатор базы, где все происходит, а sid – идентификатор пользователя, вызвавшего изменения. Узел <qn:Message>, как можно заметить, содержит произвольные данные пользователя, которые были указаны при инициализации SqlNotificationRequest.

Надо заметить, что ожидание события в методе WaitForCange(), производимое через WAITFOR(RECEIVE …) само по себе не вызывает никаких накладных расходов. Основные ресурсы отнимает удержание подключения и, с точки зрения сервера, они довольно велики. К дополнительным накладным расходам следует отнести и тот факт, что в реальном приложении, как минимум, метод WaitForChange() должен выполняться в отдельном потоке, иначе подобная функциональность имеет довольно мало смысла. И если с расходами на подключение справиться довольно сложно, то создание дополнительного потока можно обойти.

Асинхронный вариант использования SqlNotification

Попытаемся теперь применить знания об асинхронных подключениях для некоторого улучшения механизма извещений об изменениях в результатах запроса. Слегка переделанный класс для получения извещений с использованием асинхронных запросов может выглядеть примерно так:

using System;using System.Data;using System.Data.Sql;using System.Data.SqlClient;namespace Rsdn.NotifyTest{ public class AsyncNotification { private string _connectionString = "Data Source=localhost&bsol;ctpapril;Initial Catalog=cavy;" + "Integrated Security=SSPI;Pooling=false;"; public void GetData() { using (SqlConnection syncConnect = new SqlConnection(_connectionString)) { syncConnect.Open(); SqlCommand cmd = new SqlCommand("SELECT ID, [Time]," + "Data FROM dbo.AsyncTest", syncConnect); // Инициализируемобъект SqlNotificationRequest // SqlNotificationRequest notifyRequest = new SqlNotificationRequest(); notifyRequest.UserData = "Any User Data"; notifyRequest.Options = "service=NotifyTestService"; notifyRequest.Timeout = 600;// И передаем его на сервер вместе с SqlCommand// cmd.Notification = notifyRequest; SqlDataReader rdr = cmd.ExecuteReader(); while (rdr.Read()) Console.WriteLine(rdr[0] + "&bsol;t" + rdr[2] + "&bsol;t" + rdr[1]);rdr.Close(); } // асинхронное подключение создаем отдельно, без using, не закрывая// SqlConnection asyncConnect = new SqlConnection( _connectionString + "Asynchronous Processing=true;"); SqlCommand cmd2 = new SqlCommand( "WAITFOR (Receive convert(xml, message_body) from NotifyTestQueue)", asyncConnect); asyncConnect.Open(); cmd2.BeginExecuteReader( new AsyncCallback(Callback), cmd2, CommandBehavior.CloseConnection); } public void Callback(IAsyncResult result) { SqlDataReader DR = null; try { // Объект SqlCommand, длявызоваасинхронного EndExecuteReader, был// передан при вызове BeginExecuteReader, иначе из этого метода // добраться до него было бы проблематичноSqlCommand cmd = (SqlCommand)result.AsyncState; DR = cmd.EndExecuteReader(result); if (DR.Read()) Console.WriteLine(DR[0]); DR.Close();} catch (Exception ex) { // Так как этот метод вызывается совсем из другого потока, то здесь // единственный шанс отловить и обработать какое-либо исключение.// Console.WriteLine( String.Format("Last error: {0}", ex.Message)); } finally { if (DR != null && !DR.IsClosed){ DR.Close(); } } } }}

Постоянное соединение с БД удерживать по-прежнему необходимо, но потребность в создании отдельного потока для ожидания изменений данных отпала. Первая часть процедуры GetData практически полностью повторяет аналогичную процедуру из предыдущего примера, но после закрытия подключения для первого запроса вместо вызова WaitForChanges() создается подключение с возможностью выполнения асинхронных команд. Обратите внимание на выражение Asyncronous Processing = true, добавленное в строку подключения. Далее создается обычный SqlCommand, со знакомым уже запросом WAITFOR(RECEIVE …), и открывается подключение к БД. После этого выполняется первая часть асинхронной команды, передающая текст запроса на выполнение. В качестве дополнительных параметров указывается функция обратного вызова Callback(), сам объект SqlCommand, который затем передастся в Callback() как одно из свойств IAsyncResult, и CommandBehaviour указывается такой, чтобы закрыть подключение после завершения чтения данных – ведь внутри Callback() объект SqlCommand будет уже недоступен. На этом функция GetData() завершает свою работу, и основной поток идет заниматься своими делами.

В тот момент, когда в очередь попадает извещение, ожидание заканчивается. Сетевой драйвер оповещает, что асинхронная операция закончена. Вызывается метод Callback(), в который в качестве параметра передается IAsyncResult, позволяющий, во-первых, получить ответ от сервера, а во-вторых, содержащий в одном из полей и сам объект SqlCommand, метод которого EndExecuteReader добывает этот самый результат. Иными словами, мы сначала извлекаем из IAsyncResult объект SqlCommand, а затем в SqlCommand.EndExecueReader передаем все тот же IAsyncResult, чтобы получить SqlDataReader с результатом. В данном случае результат представляет собой знакомую уже XML-строку.