(function() { var ga = document.createElement('script'); ga.type = 'text/javascript'; ga.async = true; ga.src = ('https:' == document.location.protocol ? 'https://ssl' : 'http://www') + '.google-analytics.com/ga.js'; var s = document.getElementsByTagName('script')[0]; s.parentNode.insertBefore(ga, s); })();

一个小程序的性能优化,从5小时到170秒的改进

Categories: 未分类
Tags: No Tags
Comments: No Comments
Published on: 2011 年 08 月 10 日

昨天帮同事优化了一个小程序,程序的功能很简单,将每天产生的若干压缩包解压读取清单文件入库,并根据清单文件的情况选择包内的文件对数据库进行字段更新,每条记录被更新的2个字段分别存储在2个文件中,文件都是小文件,90%以上小于2K.压缩文件有1600左右,解压之后文件数量在82W个。

一般处理的思路就是将文件解压,然后读取清单,与数据库比较去重,入库,更新,删除文件,这些功能都可以在多线程下完成。逻辑思路就是这样,但是对于性能的影响主要在程序设计中对于文件的处理上。82W个文件多线程解压至少要25min,问什么这么慢呢?其主要原因并不是在CPU上,IO设计上是关键。

计算机的存储主要是用来持久数据的,根据持久的时间、访问的模式来看,主要就是HDD、RAM、CPU Cache.我们的这个应用文件IO主要是拼IOPS。我没有SSD,所以这个指标也不太高,那就只能考虑RAM了。如果处理文件的尺寸比较大,除了HDD我们肯定是没有办法,但是如果文件的尺寸很小内存就是一个很不错的选择。由于我的业务只需要读取到压缩文件的内容,对于文件是否解压实不关心的(解开最后也会删除),最佳方式是在内存里面进行解压操作,内存的IOPS比HDD高出若干数量级,只要控制好线程和内存的使用就可以了。处理压缩文件可以先读取文件中的文件列表,找出需要的文件进行解压,这样也能减少IO次数。

.net 4.0里面的并行类可以很方便的给我们提供多线程操作的功能,用起来也很方便。

再谈分页

Categories: 未分类
Tags: No Tags
Comments: No Comments
Published on: 2011 年 04 月 11 日

分页已经是很久远的话题了,很多时候都是靠着一些技巧来实现分页的,随着SQL Server版本的不断更新我们有了更多的选择,之前有人总结了一下我们现在可以使用的一些分页技术 http://www.cnblogs.com/qiuwuyu/archive/2011/03/21/1989870.html。在SQL 05/08里面我们可以使用CTE&Row_Number的方式实现分页,在SQL 2000里面我们基本上都是使用Top和Set RowCount的方式实现。你会发现高版本上提供的语句更加简洁和可读,降低了逻辑的复杂性。在下一个版本中Denali中,SQL Server在Order By函数中提供了一组语句可以实现分页并且从性能到程序的可读性有很大提高。

下面是程序的代码

USE AdventureWorks2008R2;
GO
CREATE TABLE dbo.AppSettings (AppSettingID int NOT NULL, PageSize int NOT NULL);
GO
INSERT INTO dbo.AppSettings VALUES(1, 10);
GO
DECLARE @StartingRowNumber tinyint = 1;
SELECT DepartmentID, Name, GroupName
FROM HumanResources.Department
ORDER BY DepartmentID ASC
OFFSET @StartingRowNumber ROWS
FETCH NEXT (SELECT PageSize FROM dbo.AppSettings WHERE AppSettingID = 1) ROWS ONLY;
怎么样不错吧,而且可以结合上Snapshot,确保返回结果的稳定

USE AdventureWorks2008R2;
GO

— Ensure the database can support the snapshot isolation level set for the query.
IF (SELECT snapshot_isolation_state FROM sys.databases WHERE name = N’AdventureWorks2008R2′) = 0
ALTER DATABASE AdventureWorks2008R2 SET ALLOW_SNAPSHOT_ISOLATION ON;
GO

— Set the transaction isolation level  to SNAPSHOT for this query.
SET TRANSACTION ISOLATION LEVEL SNAPSHOT;
GO

— Beging the transaction
BEGIN TRANSACTION;
GO
— Declare and set the variables for the OFFSET and FETCH values.
DECLARE @StartingRowNumber int = 1
, @RowCountPerPage int = 3;

— Create the condition to stop the transaction after all rows have been returned.
WHILE (SELECT COUNT(*) FROM HumanResources.Department) >= @StartingRowNumber
BEGIN

— Run the query until the stop condition is met.
SELECT DepartmentID, Name, GroupName
FROM HumanResources.Department
ORDER BY DepartmentID ASC
OFFSET @StartingRowNumber – 1 ROWS
FETCH NEXT @RowCountPerPage ROWS ONLY;

— Increment @StartingRowNumber value.
SET @StartingRowNumber = @StartingRowNumber + @RowCountPerPage;
CONTINUE
END;
GO
COMMIT TRANSACTION;
GO

Offset N Rows Fetch Next N Rows Only 这种可读性已经是最好了,简单到不能再简单了,如果我的程序员还不会用,那只能Fire他了。
在Warm cache的情况下(没有物理读,只有逻辑读),新的语句对于CPU的使用比较低,大家可以测试一下。
这个功能目前在CTP1中都可以实现了,大家可以从http://www.microsoft.com/downloads/en/details.aspx?FamilyID=6a04f16f-f6be-4f92-9c92-f7e5677d91f9 进行下载。

目前由于还没有发布,所以在EF中也无法得到支持,个人感觉应该在后续的EF中提供这种支持,这样对于程序员来说就一点SQL都不用写了,否则还要自己写一个extension

理解SSIS中的Pipeline

Categories: SQL, SSIS, 未分类
Tags: No Tags
Comments: No Comments
Published on: 2011 年 01 月 13 日

自从SQL 2000以后在SQL Server上SSIS就成了一个非常重要的产品。相比DTS,最大的变化不是在那些转换的控件上,而是运行体系,清晰的运行方式让开发人员可以更加深入的理解任务和数据的关系,能够将整个数据集成的项目清晰的分割成不同的阶段和任务。新架构下产生了Data Flow和Control Flow的概念。这次就重点聊聊Data Flow,希望读者们能慢慢理解其中之精髓,不明白的可以留言。(通常情况下我太愿意贴图,因为之前就没学会怎么搞图片)

Data Flow中文就是数据流,实际上Data Flow数据Control Flow中的一个任务。一个SSIS包中可以没有Data Flow但是不能没有Control Flow,例如SQL Server中的维护任务就可以是一个SSIS的包,用来备份数据或者维护索引,这种包就没有Data Flow。Data Flow主要适用于处理数据,在Data Flow中,数据从源读取写入目标,中间可以根据数据要求进行转换(结构,内容或类型)。数据从源到目的过程中,完全都是在内存中进行转换,也就是说一旦数据进到了Data Flow,在硬件上就只与内存、CPU缓存及CPU打交道了。Data Flow是依照Pipeline模式进行设计的,也就是说数据操作可以理解为逐行处理。当然数据读取和写入还是和这些数据源或目标有关系,可能也会有批的操作,但是转换上只能在内存里面逐条进行(部分操作除外,后面会提到)。如果是Pipeline模式,硬件上就需要有更快的CPU及内存的频率,这些对于性能来说很重要。但是如果不做特殊的设计和处理,系统很容易达到一个瓶颈,因为每一个pipeline只能够理由一个线程来操作,无论你的服务器有多少个核,只能发挥1个核的作用。当你发现其他的子系统压力也不大,但是SSIS转换的速率却不高的时候,基本上都是这种问题所致。

另外在由于数据通过pipeline中进行流动,如果有一个转换将所有的数据堵在内存里将极大的影响SSIS运行的效率,而且如果数据量非常大的时候可能会把内存耗尽,就像北京堵车一样,没有事故还可以,有了事故就全线飘红。SSIS可没有TempDB,物理内存没了就是没了。这种情况基本上都是在聚合组件和排序组件上,所以要尽量避免使用他们,除非万不得已,否则小心使用。

这个问题我觉得对于一般的开发人员来说不是很好解决,如果你对数据库及SSIS没有很深理解的话,基本上很难,而且这个问题是无法通过升级硬件来有效解决的(目前CPU的发展并不是高主频,而是更多核)。这个问题只能通过设计来解决,但是这可是一门学问,涉及的领域就是并行处理。并行处理需要考虑很多问题,包括数据源和数据目标的设计,控制流设计等。通过这些巧妙的设计,让我们的包能够通过更多的线程来运行更多的pipeline,当然这个更多2字并不是代表无穷多,因为当线程数量达到一定规模的时候,整个系统的瓶颈可能就会出现在其他的子系统上了,多数情况可能在网络IO或者磁盘IO上。

设计并行的SSIS包比较复杂,这里我给大家介绍一下RRD(Round-Robin Distributor)。这个组件目前还没有发布,但是可以讲讲这个组件的实现方式。实际上这个组件就是将输入pipeline根据你的data flow下层引出的执行线程数量动态的分发数据,这个功能类似于多播,只不过多播里面分发的数据是相同的,对同一条数据复制了多份并产生多个执行的线程,RRD上分发的数据是不同的。这样如果结合上适当的数据源就可以在data flow中实现并行。如果哪位在项目中需要使用可以联系我。

最近在和SSIS产品组交流的时候,我也提了一些建议,主要都是和性能相关,他们也应该与时俱进一下,让用户能够更好的利用一下多核。

项目终于有了自己的名字,维思实验室

Categories: 未分类
Tags: No Tags
Comments: No Comments
Published on: 2010 年 09 月 13 日

又折腾了1个月,新的版本release了,这次的UI改动比较大,接受了很多网友的意见。现在的实验都是系列了,以后可以搞一个从入门到精通的系列。而且后续还会添加微软云计算的内容。

前几天和开心抽烟的时候聊了一下HTML5,看似功能很强大,但是浏览器支持上还是有一些小问题,毕竟国内还有很多IE6(我们公司就有一堆)。另外HTML5能做的SL也都能做,让开发者不太好选择。有人说SL需要安装插件,但是Flash也需要,我们的那个平台也需要,所以我觉得插件根本不是问题,平台好才是真的好,管他有没有插件。我们的那个平台之所以没有使用SL,主要的原因还是SL太安全了,安全的有一些功能非常难实现(当然也不是绝对不能,但是相当困难),SL调用COM还是挺不靠谱的。HTML5我们更不敢用,至少现在不敢,虽然我们的平台需要客户端使用IE,但是至少还没有说需要IE9吧。

最近平台在做一个活动,找bug,提意见,领奖品,有兴趣的可以去http://www.tech-labs.net上面体验一下。

另外最近添加了SQL 2008 R2的实验。

我们第一个云项目

Categories: 未分类
Tags: No Tags
Comments: No Comments
Published on: 2010 年 08 月 23 日

经过半年多的努力第一个云项目及产品基本上快完成了,目前主要用在产品培训及教育领域。UI比之前的版本强了很多,而且速度也快了很多,ADSL的链路就会有很好的体验。现在开放了一个平台给技术爱好者使用,也算是公益事业了。如果有兴趣可以访问我们的这个智慧实验室,这个平台上目前提供了一些技术学习的资源,包含VS2010\ASP.NET MVC\F#等新技术。大家有兴趣可以上去看看。

Engine Restart

Categories: 未分类
Tags: No Tags
Comments: No Comments
Published on: 2010 年 08 月 18 日

好久没有写点东西了,不是没什么可以写的主要是工作很繁忙,事务太多了。之前1年的时间我们的团队做了一个半成品,但是由于市场及政策的限制,还没有到市场运作的阶段。虽然不是胎死腹中,也要在让他多在娘胎里面住上一段时间。

最近我们的一个团队在做一个私有云的项目,希望利用虚拟化技术将OS的功能发布出来。所有想法的源头来自于我们的一个客户,他们希望有一种基于互联网的IT体验,就是希望将OS的功能通过网络发布出来。我们考察了一些架构和体系,发现无论是SL还是Flash都无法真实的模拟OS的功能,尤其是在无法确认客户要求的时候,毕竟客户希望获得和OS一样的体验,不仅仅只是文件管理、网络浏览等,可能还有更多的应用需求。如果基于Flash或者SL开发这个平台,就必须为ISV提供一套健壮可靠的基础框架,这种方式的开发成本和应用移植的成本非常之大。很多WebOS的设计思想都是这样的,但是在上面运行的应用程序都是基于BS架构的,客户的应用场景不可能都是BS的,例如Visual Studio就很难做成BS的,虽然编辑器没什么问题但是调试器基本上很难模拟(也不是完全不可行,只是成本和效率不好保证)。当然那这种实现的好处就是运行时需要的资源比较少,SL和Flash都是运行在客户端,服务器端的开销比较少。另外一种可选择的方式就是基于虚拟化技术的VDI,很多公司都有实现,但是也都不是非常完美,一些特殊的场景也不是很好用。Windows Azure是一个好东西,但是它主要提供的是Hosting服务,也无法满足UI及WebOS的功能。MS的终端服务倒是一个能够提供客户端会话的技术,而且开销也相对较少,但是另外一个问题也无法解决,就是无法做到用户的完全隔离,毕竟都是运行在同一个OS上的不同会话,如果操作一些共享的配置及功能例如安装一些软件需要重启。

WebOS的设计就应该提供

  1. 完整的UI支持
  2. 对现有应用程序的支持
  3. 用户环境的隔离和独立
  4. 如果能够支持多浏览器最好

但是WebOS是永远不会替代本地OS的,因为从安全模型的角度来说无法让远程的代码来控制你本地的资源。其次就是现有的数十万的软件也无法全都移植到WebOS上,虽然SL、FLASH及HTML5已经功能提供的够丰富了,但是本地资源的利用还是本地操作系统实现的更好更简单。

我们的Engine终于可以restart了。

 

BTW:如果我们的项目发布了,我会在Blog发布体验的地址,大家用Live登录即可。

StreamInsight系列-QueryTemplate\QueryBinder\Query

Categories: 未分类
Tags: No Tags
Comments: No Comments
Published on: 2009 年 12 月 29 日

坚持每天写Blog真的是一件很困难的事情,到现在也没搞明白怎么在这个Blog上传图片和附件,只能辛苦大家读读文字和代码片段了。

QueryTemplate 就是我们在CEP服务器上预定义好并持续运行的分析和处理提交事件的业务逻辑,简单的来说就是业务逻辑的定义。

我们写程序的时候都喜欢搞内聚低藕和的东西,同样我们写程序的时候也喜欢把数据连接和查询逻辑分开,同样在StreamInsight中Adapter是负责数据的输入和输出,QueryTemplate的主要功能就是忽略Adapter是业务逻辑独立出来。例如

  CepStream<NetWorkPacket> packetStream = CepStream<NetWorkPacket>.Create("networkInput");
var avgcount = from oneminpacket in packetStream.AlterEventDuration
                   (e => TimeSpan.FromSeconds(1))
               group oneminpacket by oneminpacket.Destination into onegroup
               from eventwindow in onegroup.Snapshot()
               select new { destination = onegroup.Key, packagecount = eventwindow.Count(),packageAvgSize=eventwindow.Avg(e=>e.Length) };
          return application.CreateQueryTemplate("destinationOutput", avgcount);

这里的packetStream只是定义了事件流的泛型,但是我们不必考虑这个流怎么传入的

QueryBinder就是一个Helper,他将InputAdapter\OutputAdapter以及QueryTemplate绑定在一起,实现对业务处理的组装。

private static QueryBinder BindQuery(InputAdapter networkInputAdapter, OutputAdapter traceOutputAdapter, QueryTemplate queryTemplate)
        {
            QueryBinder queryBinder = new QueryBinder(queryTemplate);
            var networkConfig = new NetworkInputConfig
            {
                InterfaceAddress = "192.168.0.118",
                InputFieldOrders = new List<string>() {
                    "Length", "Source", "Destination", "SourcePort", "DestinationPort",
                    "Content"
                },
                CtiFrequency=3
            };
            var outputConfig = new TraceConfig()
            {
                DisplayCtiEvents = false,
                TraceName = "consoleout",
                TraceType = TraceTypeValue.Console
            };
            queryBinder.BindProducer("networkInput", networkInputAdapter, networkConfig, EventShape.Point);
            queryBinder.AddConsumer<TextFileOutputConfig>("queryresult", traceOutputAdapter, outputConfig, EventShape.Point,StreamEventOrder.ChainOrdered);

            return queryBinder;
        }

最后的Query就我们需要运行的处理实例。一旦Query运行,就会按照输入的事件进行处理并输出。

Query query = application.CreateQuery("Network_traffic_query", queryBinder, "Minute average count,filtered by address");

query.Start();

 

有了Adapter和今天提到的Query,你的StreamInsight程序就基本上差不多了,下次再写点Host和Debug的东西。

StreamInsight系列-Adapter

Categories: 未分类
Tags: No Tags
Comments: 4 Comments
Published on: 2009 年 12 月 25 日

坚持每天都有Post真的是一件很困难的事情,除非是那种宅男或是患有强烈的电脑依赖症。连续写东西真的很累。

今天聊聊StreamInsight中的Adapter,这是一个很大的话题,但是很重要,因为数据的输入和输出都要靠他了。开发Adapter不是很困难的事情,但是需要先了解一下Adapter的工作机制。Adapter是基于状态机的模式进行工作我们的代码主要是实现如果在不同状态间切换及如何从数据源中获取信息并构造事件数据。

 

当Adpater创建后通过Query的启动来调用Aapter的Start方法,之后AdapterState进入Running状态,这个时候我们需要通过代码实现对事件的创建及Enqueue操作,Enqueue会返回一个结果,表示队列的状态,如果状态为进入Suppended状态,当可用时StreamInsight会调用Resume()方法重新是Adapter进入Running状态。这个图上给出的是InputAdapter和OutputAdapter的状态图。

Adapter的停止将分为2个状态Stopping和Stopped,我们在处理和生成事件的时候要注意AdapterState,如果是Stopping需要对缓冲区的数据进行处理。

Adapter的创建需要通过工厂模式进行实例化。首先我们要实现IInputAdapterFactory\ITypeInputAdapterFactory或对应的OutputAdapterFactory,这个是成对出现的。这2中接口分别用于创建弱类型和强类型的AdapterFactory,我个人认为弱类型会灵活一些,强类型的性能稍好一些,并可以编译检查,具体使用需要结合需求。下面是我的一个AdpaterFactory的代码。其中的我只处理对于Point类型Event的处理,并返回NetworkPointInput这个Adapter。

public struct NetworkInputConfig
    {

//网络监听地址
        public string InterfaceAddress { get; set; }

//CTI
        public int CtiFrequency { get; set; }

//输入数据字段名称及顺序
        public List<string> InputFieldOrders { get; set; }
    }
    public  class NetworkInputFactory:IInputAdapterFactory<NetworkInputConfig>
    {
        public InputAdapterBase Create(NetworkInputConfig configInfo, EventShape eventShape, CepEventType cepEventType)
        {
            InputAdapterBase adapter = default(InputAdapterBase);
            if (eventShape == EventShape.Point)
            {
                adapter = new NetworkPointInput(configInfo, cepEventType);
            }
            else
            {
                throw new ArgumentException("The adapter cannot instance adapter with event shape {0}", eventShape.ToString());
            }
            return adapter;
        }
        public void Dispose()
        {
        }
    }

针对Adapter我们需要继承PointInputAdapter类,并重载Resume()和Start()方法。当然我们还需要写构造函数接受AdapterFactory中传入的config和cepEventType.

public NetworkPointInput(NetworkInputConfig configInfo, CepEventType cepEventType)
    {
        _bindtimeEventType = cepEventType;
        _ctiFrequency = configInfo.CtiFrequency;
        foreach (PcapDevice device in Pcap.GetAllDevices())
        {
            foreach (PcapAddress address in device.Addresses)
            {
                if (address.Addr.ipAddress.ToString() == configInfo.InterfaceAddress)
                {
                    _device = device;
                    break;
                }
            }
            if (_device != null)
                break;
        }
        if (_device == null)
            throw new ArgumentException("Interface address is not found");
        _inputOrdinalToCepOrdinal = new Dictionary<int, int>();
        //配置文件字段数量与CepEventType字段数量不一致
        if (configInfo.InputFieldOrders != null && configInfo.InputFieldOrders.Count != cepEventType.Fields.Count)
        {
            throw new ArgumentException(
                string.Format(CultureInfo.InvariantCulture,
                "The configuration element InputFieldOrders should have {0} elements, but it only has {1} elements",
                cepEventType.Fields.Count,
                configInfo.InputFieldOrders.Count));
        }
        CepEventTypeField engineField;
        for (int i = 0; i < cepEventType.Fields.Count; i++)
        {
            if (configInfo.InputFieldOrders != null)
            {
                if (!cepEventType.Fields.TryGetValue(configInfo.InputFieldOrders[i], out engineField))
                {
                    throw new ArgumentException(
    &#16
0;                   string.Format(CultureInfo.InvariantCulture,
                        "Event type {0} doesn’t have an input field named ‘{1}’", cepEventType.ShortName, configInfo.InputFieldOrders[i]));
                }

              _inputOrdinalToCepOrdinal.Add(i, engineField.Ordinal);
            }
            else
            {
                // Use default mapping
                _inputOrdinalToCepOrdinal.Add(i, i);
            }
        }
        if (!_device.Opened)
            _device.Open(true, 1000);
        _device.SetFilter("ip and tcp");
    }

我的程序引用了SharpPcap类库并安装Winpcap来监听网络数据。

构造函数还是很简单基本上是从配置文件中创建需要使用的一些内部对象。

public override void Resume()
        {
            ProduceEvents();
        }

public override void Start()
        {
            ProduceEvents();
        }

主要是这段ProduceEvents()

private void ProduceEvents()
{
    PointEvent currEvent = default(PointEvent);
    EnqueueOperationResult result = EnqueueOperationResult.Full;
    Packet packet = null;
    while (true)
    {
//判断设备状态

        if (!_device.Opened)
        {
            _device.Open(true, 1000);
            _device.SetFilter("ip and tcp");
        }

//循环获取网络数据包

        while ((packet = _device.GetNextPacket()) != null)
        {
            #region Adapter Stopping
            if (AdapterState.Stopping == AdapterState)
            {
                if (_pendingEvent != null)
                {
                    currEvent = _pendingEvent;
                    _pendingEvent = null;
                }
                PrepareToStop(currEvent);
                Stopped();
                return;
            }
            #endregion
            if (_pendingEvent != null)
            {
                currEvent = _pendingEvent;
                _pendingEvent = null;
            }
            else if (_pendingCtiTime != null)
            { }
            else
            {
                if (packet is TCPPacket)
                {
                    currEvent = CreateEventFromPacket(packet);
                    _pendingEvent = null;
                }
            }
            if (_pendingCtiTime != null)
            {
                result = EnqueueCtiEvent(_pendingCtiTime.Value);
                if (EnqueueOperationResult.Full == result)
                {
                    PrepareToResume(_pendingCtiTime.Value);
                    Ready();
                    return;
                }
                else
                {
                    _pendingCtiTime = null;
                }
            }
            else
            {
                DateTimeOffse
t currEventTime = currEvent.StartTime;
                result = Enqueue(ref currEvent);
                if (EnqueueOperationResult.Full == result)
                {
                    PrepareToResume(currEvent);
                    Ready();
                    return;
                }
                else
                {
                    _eventsEnqueued++;
                    _pendingEvent = null;
                    if (Zero == (_eventsEnqueued % _ctiFrequency))
                    {
                        DateTimeOffset currCtiTime = currEventTime + TimeSpan.FromTicks(NumberOfTicks);
                        result = EnqueueCtiEvent(currEventTime);
                        if (EnqueueOperationResult.Full == result)
                        {
                            PrepareToResume(currEventTime);
                            Ready();
                            return;
                        }
                        else
                        {
                            _pendingCtiTime = null;
                        }
                    }
                }
            }
        }
    }
}

private PointEvent CreateEventFromPacket(Packet packet)
        {
            PointEvent evt = CreateInsertEvent();
            evt.StartTime = DateTime.Now ;
            TCPPacket tcp = (TCPPacket)packet;

            string content = string.Empty;
            if (tcp.TCPData.Length > 0)
            {
                content = System.Text.ASCIIEncoding.ASCII.GetString(tcp.TCPData);
                if (content.Length > 256)
                    content = content.Substring(0, 256);
            }
            string[] data = new string[] {
                //Len
            packet.PcapHeader.CaptureLength.ToString(),
            tcp.SourceAddress.ToString(),
            tcp.DestinationAddress.ToString(),
            tcp.SourcePort.ToString(),
            tcp.DestinationPort.ToString(),
            content
                       };
            //populate the payload fields
            for (int ordinal = 0; ordinal < _bindtimeEventType.FieldsByOrdinal.Count; ordinal++)
            {
                try
                {
                    int cepOrdinal = _inputOrdinalToCepOrdinal[ordinal];
                    CepEventTypeField evtField = _bindtimeEventType.FieldsByOrdinal[cepOrdinal];
                    object value = Convert.ChangeType(data[ordinal], evtField.Type.ClrType);
                    evt.SetField(cepOrdinal, value);
                }
                catch (AdapterException e)
                {
        &
#160;           Console.WriteLine(e.Message + "\n" + e.StackTrace);
                }

            }
            return evt;
        }
        private void PrepareToResume(DateTimeOffset currCtiTime)
        {
            _pendingCtiTime = currCtiTime;
        }
        private void PrepareToStop(PointEvent currEvent)
        {
            if (null != currEvent)
            {
                ReleaseEvent(ref currEvent);
            }
            if (_device.Opened)
                _device.Close();
        }
        private void PrepareToResume(PointEvent currEvent)
        {
            _pendingEvent = currEvent;
        }
    }

这些都是最核心的部分了基本上所有的Adapter都差不多,不同的是Event如何创建,这里面我有一个方法CreateEventFromPacket()来创建Event。但是需要注意的是Event Payload中的字段最多是32个这个无法改,每个字段默认最长为512字节,目前我还不知道如何更改payload默认长度限制。

@Starbucks_Guangzhou

StreamInsight系列-What it is

Categories: 未分类
Tags: No Tags
Comments: 2 Comments
Published on: 2009 年 12 月 24 日

SQL Server是一套RDBMS,这个产品的功能就不多说了,现实世界中很多情况都会使用数据库作为数据的存储和分析平台,但是有些应用场景并不适合使用数据库进行分析和处理,尤其是对于处理量高、延时低的场景。目前的SQL Server中的数据是通过内存+磁盘的方式进行持久化,换句话来说,在SQL中的数据肯定会被持久,但是在制造、监控、电力、金融等领域中更多的数据可能不需要实时的进行持久化,而是需要通过这些数据进行实时监控,这是数据库的功能就会有很多限制。

Microsoft StreamInsight provides a powerful platform for developing and deploying complex event processing (CEP) applications. CEP is a technology for high-throughput, low-latency processing of event streams. Typical event stream sources include data from manufacturing applications, financial trading applications, Web analytics, or operational analytics. The StreamInsight stream processing architecture and the familiar .NET-based development platform enable developers to quickly implement robust and highly efficient event processing applications.

StreamInsight是一个框架

很多实时监控的事件处理无法使用数据库实现这些功能,因为数据的特性并不是交易行,而是事件,这时我们需要的是基于事件驱动的处理程序。StreamInsight就是这样一个框架,通过这个框架我们可以构建健壮的、高性能的事件处理程序,可以用于制造、金融、电力、互联网等对于实时数据分析要求很高的分析系统。

举个例子大家就能很好的理解了。一家汽车制造商会几条流水线对零部件组装,为了提高效率会使用很多机器人实现焊接及装配,为了提高产品合格率,每个机器人的机械臂都会有若干传感器监视焊点位置信息。传感器将感知机械臂每次运动的轨迹及焊点位置,并将信息传送给监控系统,为了提高产品质量,肯定需要监控程序观察传感器发出的信息并将信息与预先设计的指标进行匹配,这种信息处理的延时要求很高,而且吞吐量很大,10000 events/second,延时接近于0。如果事件信息与预先设计的指标发现偏差并且偏差值高于预先设置的范围,系统应该报警。这种系统的设计如果使用关系型数据库实现,性能将无法满足。StreamInsight就可以解决这种问题,事件数据通过队列的方式发送到StreamInsight组件中并存储在内存中直到满足业务规则后从输出中取出。

StreamInsight是用于处理事件的

在StreamInsight中有2类事件

  • Insert

Insert事件用于将event和payload添加到事件流中

Header
Payload

Event kind ::= INSERT

StartTime ::= datetime

EndTime ::= datetime

Field 1 … Field n as CLR types

  • CTI

CTI事件是一种特殊的事件用于指出事件的完成

Header

Event kind ::= CTI

StartTime ::= datetime

也就是说事件Insert到队列后需要添加一个CTI指出这个事件结束

在StreamInsight中事件有3模型

  • Point

这种事件的特性是基于时间点的payload,例如2009-12-24 3:00:00 温度 24

  • Interval

这种事件的特性是描述一段时间内的payload,例如 2009-12-24 3:00:00  2009-12-24 3:00:10 车流量 100

  • Edege

这种事件的特性是在记录中没有指定结束时间,也就是说记录中事件的类型,有Start和End

我们可以通过StreamInsight输入和输出这3中类型的事件。

 

StreamInsight 核心是基于Event的payload的查询

在StreamInsight中的核心功能就是根据规则对事件进行处理,主要是根据payload中的字段进行汇总、计算、过滤及投影,这些功能是基于对内存中的事件队列进行LINQ的查询,当然这些查询并不一定是hard code,可以通过编码做成动态调整的。

StreamInsight不是内存数据库

有人认为StreamInsight是内存数据库,我认为这个观点不是正确的。因为StreamInsight中只是用于处理事件的而且这种事件一定是基于事件的,虽然在payload中我们不用存储事件字段,但是系统会帮我们添加时间字段辅助系统处理。内存数据库需要实现更多的功能,例如启动数据的装载,持久等,并实现CRUD功能,而StreamInsight没有。但是在一些业务场景上可以使用StreamInsight实现内存数据库的功能。

今天先写到这里,明天继续写 How to do

RBS 补充解释

Categories: 未分类
Tags: No Tags
Comments: 1 Comment
Published on: 2009 年 11 月 09 日

在TechED09上我讲了一节关于非结构化数据存储的课程,其中里面提到了RBS,由于时间有限,有一些概念并没有讲清楚。

RBS只是一个框架,必须依赖于数据存储的Provider来工作,如果没有Provider,RBS无法存储数据。默认情况下RBS安装之后没有Provider,必须安装Provider,现在Provider其实并不多,正式发布的Provider不多,EMC有相应的Provider,微软也有一个基于Filestream的Provider,其他的Provider正在开发过程中。

我在演示中给大家看到的那个Provider,并不是微软正式发布的Provider,而是我们在项目中开发的一个Provider,这个Provider有负载均衡的功能,如果大家有这种项目可以联系微软。在MSDN上有专门关于RBS Provider的开发规范,大家只需要按照这个规范来实现就可以了。

RBS提供了公共的维护操作,可以时间GC、Delete Blob、Delete Pool等操作,客户端调用RBS类库的时候只需要执行读或写就可以了,删除的操作只需要将元数据中的记录删除就可以了,当运行公共维护程序的时候,RBS会调用GC及DeleteBlob、DeletePool来真正的删除Blob数据,这是一个异步过程,可以优化IO。

在SQL  Server 2008里的RBS是第一个版本,里面有一个很严重的问题,就是应用程序(.exe)必须直接引用RBS类库,否则无法运行,典型的应用场景就是ASP.NET应用程序无法调用RBS,这个问题在SQL 08R2的RBS中已经解决了。当然之前的那个问题也有其他的解决方案,例如文件先落地生成本地文件,利用本地文件做一个双向的缓存,或者建立一个WCF服务来访问RBS。

下面我讲讲这个Provider的设计思路。

在我们实现的Provider中主要使用了Filestream来存储数据并提供均衡功能。我们的设计中有一个数据库提供了整个存储系统的元数据管理,例如系统中包含哪些数据库,哪些Schema,每个Schema上表的存储位置,每个Schema的负载比例等。负载均衡其实就是读取当前可以写入Pool的表,然后随机数取模,随机或得一个可用的表,然后将Blob数据通过Filestream Win32API存储到这个表上。

在RBS中有一个概念叫做Pool,是存储的基本单元,在Pool内存储的就是Blob了。Pool可以与blob_store及Collection关联,blob_store与Provider关联,在使用的时候在RBS客户端可以选在将Blob存储在相应的Collection,这样就可以选则Provider了。每个Pool都有一个属性can_store_new_blobs,我们可以利用这个属性来进行很多配置,例如存档。在我们的Provider中,每个Pool都会对应存储在不同数据库中的很多表,每个表都有自己的2个文件组,一个用户存储Row Data,一个用于存储Blob数据,这样在备份的时候我们就有很多中可选择的方式。例如,每天备份Primary文件组,因为这里存储了元数据(表及文件组信息)及每天创建的文件组,这是一种滚动的增量更新的方式。当然还可以有其他的方式,例如,每天创建新的数据库并调整配置,可以将新的数据写入新的数据库等,这些方式取决于你的要求。

对于均衡,我在Demo的时候已经给大家看了,其实就是通过调整一个创建表的数量来调整的,每次修改这个值之后必须将现有的Pool can_store_new_blobs置为0,这样先一个存储请求将建立新的Pool,新的存储配置才能够生效。

另外,如果大家写程序的时候一定要注意CreatePool的这个操作。因为RBS是在客户端调用的就可能会出现多线程同时访问CreatePool的这个方法的情况。我们的解决方法是创建一个服务,将并行转为串行,只有第一个进入CreatePool服务的请求会真正创建,其他的都直接返回已经创建好的PoolId。

如果大家有什么关于RBS的问题可以直接发邮件至

«page 1 of 3
Welcome , today is 星期五, 2017 年 04 月 28 日