标签归档:数据库审计

Mssql数据库审计方案

SQL Server 2008R2版本实现方案:
1)数据库事务日志
在SQL Server数据库事务日志中,记录了每个事务的数据变更操作的详细信息。
2)Audit
之前做过的数据库蜜罐,针对单表监控增删改查行为并告警,就是使用的audit。
文章链接:

MSSQL数据库蜜罐测试


3)SQL Profiler
SQL Profiler是微软从SQL Server 2000开始引入的数据库引擎跟踪工具,具有使用界面操作的接口、使用SQL语句创建接口以及使用SMO编程创建接口。
4)Extended Event
Extended Event更加轻量级,性能消耗比SQL Profiler大幅降低,因此对用户系统性能影响也大幅减轻。对系统性能和吞吐量影响均在0.01%左右。

综合考虑可靠性、可维护性、系统开销和影响来看,使用Extended Event实现审计日志的方法是最优的选择。所以这里我们采用最优的方案Extended Event。

创建Extended Event Session并启用:

USE master
GO

CREATE EVENT SESSION [svrXEvent_User_Define_Testing] ON SERVER 
ADD EVENT sqlserver.sql_statement_completed
( 
	ACTION 
	( 
		sqlserver.database_id,
		sqlserver.session_id, 
		sqlserver.username, 
		sqlserver.client_hostname,
		sqlserver.client_app_name,
		sqlserver.sql_text, 
		sqlserver.plan_handle,
		sqlserver.tsql_stack,
		sqlserver.is_system,
		package0.collect_system_time
	) 
	WHERE sqlserver.username <> N'NT AUTHORITY\SYSTEM'
		AND sqlserver.username <> 'sa'
		AND sqlserver.is_system = 0		
)
ADD TARGET package0.asynchronous_file_target
( 
	SET 
		FILENAME = N'C:\Temp\svrXEvent_User_Define_Testing.xel', 
		METADATAFILE = N'C:\Temp\svrXEvent_User_Define_Testing.xem',
		MAX_FILE_SIZE = 10,
		MAX_ROLLOVER_FILES = 500
)
WITH (
	EVENT_RETENTION_MODE = NO_EVENT_LOSS,
	MAX_DISPATCH_LATENCY = 5 SECONDS,
    STARTUP_STATE=ON
);
GO


-- We need to enable event session to capture event and event data 
ALTER EVENT SESSION [svrXEvent_User_Define_Testing]
ON SERVER STATE = START;
GO

扩展事件创建完毕并启动以后,发生在SQL Server数据库服务端的所有sql_statement_completed事件信息都会被扩展事件异步滚动记录在日志文件svrXEvent_User_Define_Testing.xel文件中

每个事件以XML格式单行写入日志文件,因此我们可以采用SQL Server提供的动态管理函数sys.fn_xe_file_target_read_file来读取和分析日志文件。
SQL Server DMF sys.fn_xe_file_target_read_file ( path, mdpath, initial_file_name, initial_offset )
以下是使用DMF全量读取所有审计日志文件记录的例子:

USE master
GO

SELECT *
FROM sys.fn_xe_file_target_read_file('C:\Temp\svrXEvent_User_Define_Testing*.xel', 
		'C:\Temp\svrXEvent_User_Define_Testing*.xem', null, null)

结果如下:

详细的日志信息在Event_data字段中,使用下面的查询语句获取更为详细的信息:

-- This is SQL 2008R2
;WITH events_cte
AS (
	SELECT
		[event_data] = T.C.query('.'),
		[event_name] = T.C.value('(event/@name)[1]','varchar(100)'),
		[event_time] = DATEADD(mi, DATEDIFF(mi, GETUTCDATE(), CURRENT_TIMESTAMP),T.C.value('(event/@timestamp)[1]','datetime2')),
		[client app name] = T.C.value('(event/action[@name="client_app_name"]/value/text())[1]', 'sysname'),
		[client host name] = T.C.value('(event/action[@name="client_hostname"]/value/text())[1]', 'sysname'),
		[database_id]= T.C.value('(event/action[@name="database_id"]/value/text())[1]', 'int'),
		[cpu time (ms)] = T.C.value('(event/data[@name="cpu"]/value/text())[1]', 'bigint'),
		[logical reads] = T.C.value('(event/data[@name="reads"]/value/text())[1]', 'bigint'),
		[logical writes] = T.C.value('(event/data[@name="writes"]/value/text())[1]', 'bigint'),
		[duration (ms)] = T.C.value('(event/data[@name="duration"]/value/text())[1]', 'bigint'),
		[row count] = T.C.value('(event/data[@name="row_count"]/value/text())[1]', 'bigint'),
		[sql_text] = T.C.value('(event/action[@name="sql_text"]/value/text())[1]','nvarchar(max)'),
		[session_id] = T.C.value('(event/action[@name="session_id"]/value/text())[1]','int'),
		[user_name] = T.C.value('(event/action[@name="username"]/value/text())[1]','sysname'),
		[is_system] = T.C.value('(event/action[@name="is_system"]/value/text())[1]','sysname'),
		[query_timestamp] = T.C.value('(event/action[@name="collect_system_time"]/value/text())[1]','bigint'),
		[query_time] = DATEADD(mi, DATEDIFF(mi, GETUTCDATE(), CURRENT_TIMESTAMP),T.C.value('(event/action[@name="collect_system_time"]/text/text())[1]','datetime2'))
	FROM sys.fn_xe_file_target_read_file('C:\Temp\svrXEvent_User_Define_Testing*.xel', 
		'C:\Temp\svrXEvent_User_Define_Testing*.xem', null, null)
		CROSS APPLY (SELECT CAST(event_data as XML) AS event_data) as T(C)
)
SELECT 
	
	cte.session_id,
	--cte.query_timestamp,
	--cte.[event_time],
	cte.[query_time],
	--cte.[event_name],
	cte.user_name,
	[database_name] = db.name,
	cte.[database_id],
	cte.[client host name],
	
	cte.[logical reads],
	cte.[logical writes],
	cte.[cpu time (ms)],
	cte.[duration (ms)],
	--cte.[plan_handle],
	cte.sql_text,
	sql_text_hash = CHECKSUM(cte.sql_text),
	cte.[client app name],
	cte.[event_data],
	cte.is_system
FROM events_cte as cte
	LEFT JOIN sys.databases as db
	on cte.database_id = db.database_id
ORDER BY [query_time] ASC
;

 

结果如下:

从这个结果集中,我们可以很清楚的知道每一条SQL语句执行的详细情况,包括:用户名、执行时间点、客户机名、逻辑读、逻辑写、CPU消耗、执行时间消耗、查询语句详情等非常重要的信息。
另外也可以传入initial_file_name和initial_offset来实现从某个日志文件的特定offset(文件内容偏移量)开始读取。

 

参考文章:
http://mysql.taobao.org/monthly/2017/06/06/
http://mysql.taobao.org/monthly/2017/07/06/
http://mysql.taobao.org/monthly/2017/08/08/
https://github.com/elastic/beats/issues/149

【企业安全实战】数据库审计部署实践

0x01 概述


安全的核心是数据,数据库安全也是企业安全中很重要的一点,当然数据库安全涉及到很多方面,又衍生出很多安全产品,例如数据库审计、数据库防火墙、数据库加密、数据库脱敏等,本文主要阐述企业内部Mysql DB审计记录SQL执行的实现。按照部署方式分为以下几种:

1)流量镜像

旁路部署,透明部署,不影响网络拓扑,也不会造成额外的性能消耗。

2)DB Proxy

很多公司都有Mysql中间件,用于读写分离、负载均衡、监控等等。

3)DB Server部署Agent

在每台DB Server上安装Agent获取DB流量。

4)DB审计插件

安装插件记录增删改查语句,然后采集生成日志。

这里我们介绍一下比较流行的分析工具和插件。

 

0x02 DB审计插件


1、Mariadb Audit插件

从Mariadb 10.0版本开始audit插件直接内嵌了,名称为server_audit.so,可以直接加载使用。配置过程如下:

配置yum 数据源:

cd /etc/yum.repos.d/ 

vim /etc/yum.repos.d/MariaDB.repo

写入以下内容:

# MariaDB 10.0 CentOS repository list - created 2015-08-12 10:59 UTC 

# http://mariadb.org/mariadb/repositories/ 

[mariadb] 

name = MariaDB 

baseurl = http://yum.mariadb.org/10.0/centos6-amd64 

gpgkey=https://yum.mariadb.org/RPM-GPG-KEY-MariaDB 

gpgcheck=1

安装数据库:

yum -y install MariaDB-server MariaDB-client

启动数据库:

service mysql start

设置root密码:

mysqladmin -u root -p password 'hehe123'

安装审计插件:

MariaDB [(none)]> install plugin server_audit soname 'server_audit.so';

Query OK, 0 rows affected (0.00 sec)

MariaDB [(none)]> show variables like '%audit%';

+-------------------------------+-----------------------+

| Variable_name                 | Value                 |

+-------------------------------+-----------------------+

| server_audit_events           |                       |

| server_audit_excl_users       |                       |

| server_audit_file_path        | server_audit.log      |

| server_audit_file_rotate_now  | OFF                   |

| server_audit_file_rotate_size | 1000000               |

| server_audit_file_rotations   | 9                     |

| server_audit_incl_users       |                       |

| server_audit_logging          | OFF                   |

| server_audit_mode             | 0                     |

| server_audit_output_type      | file                  |

| server_audit_query_log_limit  | 1024                  |

| server_audit_syslog_facility  | LOG_USER              |

| server_audit_syslog_ident     | mysql-server_auditing |

| server_audit_syslog_info      |                       |

| server_audit_syslog_priority  | LOG_INFO              |

+-------------------------------+-----------------------+

15 rows in set (0.00 sec)

参数说明:

server_audit_output_type:指定日志输出类型,可为SYSLOG或FILE

server_audit_logging:启动或关闭审计

server_audit_events:指定记录事件的类型,可以用逗号分隔的多个值(connect,query,table),如果开启了查询缓存(query cache),查询直接从查询缓存返回数据,将没有table记录

server_audit_file_path:如server_audit_output_type为FILE,使用该变量设置存储日志的文件,可以指定目录,默认存放在数据目录的server_audit.log文件中

server_audit_file_rotate_size:限制日志文件的大小

server_audit_file_rotations:指定日志文件的数量,如果为0日志将从不轮转

server_audit_file_rotate_now:强制日志文件轮转

server_audit_incl_users:指定哪些用户的活动将记录,connect将不受此变量影响,该变量比server_audit_excl_users 优先级高

server_audit_syslog_facility:默认为LOG_USER,指定facility

server_audit_syslog_ident:设置ident,作为每个syslog记录的一部分

server_audit_syslog_info:指定的info字符串将添加到syslog记录

server_audit_syslog_priority:定义记录日志的syslogd priority

server_audit_excl_users:该列表的用户行为将不记录,connect将不受该设置影响

server_audit_mode:标识版本,用于开发测试

 

vim /etc/my.cnf.d/server.cnf

在[server]下添加:

server_audit_events='CONNECT,QUERY,TABLE'

server_audit_logging=ON     

server_audit_file_rotate_size = 10G   

server_audit_file_path='/tmp/server_audit.log'

执行数据库操作,审计到的内容如下:

20170307 08:39:55,kafka112,root,localhost,3,67,READ,test,test,

20170307 08:39:55,kafka112,root,localhost,3,67,QUERY,test,'select * from test',0

20170307 08:40:29,kafka112,root,localhost,3,0,DISCONNECT,test,,0

20170307 08:41:06,kafka112,root,localhost,4,0,FAILED_CONNECT,,,1045

20170307 08:41:06,kafka112,root,localhost,4,0,DISCONNECT,,,0

20170307 08:41:11,kafka112,root,localhost,5,0,CONNECT,,,0

20170307 08:41:11,kafka112,root,localhost,5,69,QUERY,,'select @@version_comment limit 1',0

20170307 08:41:22,kafka112,root,localhost,5,70,QUERY,,'show variables like \'%audit%\'',0

2Mysql audit plugin

Mysql audit plugin是Mcafee开源的Mysql审计工具,支持版本为MySQL (5.1, 5.5, 5.6, 5.7),MariaDB (5.5, 10.0, 10.1) ,Platform (32 or 64 bit)。

插件地址如下:

https://bintray.com/mcafee/mysql-audit-plugin/release

根据Mysql版本下载配对的版本插件。

https://bintray.com/mcafee/mysql-audit-plugin/release/1.1.4-725?versionPath=%2Fmcafee%2Fmysql-audit-plugin%2Frelease%2F1.1.4-725#files

下载解压后,在Lib目录下找到libaudit_plugin.so

查看plugin目录

mysql> show variables like '%plugin%';

+---------------+-------------------------+

| Variable_name | Value                   |

+---------------+-------------------------+

| plugin_dir    | /usr/lib64/mysql/plugin |

+---------------+-------------------------+

1 row in set (0.00 sec)

将libaudit_plugin.so复制到Plugin_dir下

安装插件:

mysql> INSTALL PLUGIN AUDIT SONAME 'libaudit_plugin.so';

Query OK, 0 rows affected (0.35 sec)

查看版本:

mysql> show global status like '%audit%';

+------------------------+-----------+

| Variable_name          | Value     |

+------------------------+-----------+

| Audit_protocol_version | 1.0       |

| Audit_version          | 1.1.4-725 |

+------------------------+-----------+

2 rows in set (0.01 sec)

开启审计功能

mysql> set global audit_json_file = ON;

Query OK, 0 rows affected (0.00 sec)

查看配置参数

mysql> show global variables like '%audit%'\G

*************************** 1. row ***************************

Variable_name: audit_before_after

        Value: after

*************************** 2. row ***************************

Variable_name: audit_checksum

        Value:

*************************** 3. row ***************************

Variable_name: audit_client_capabilities

        Value: OFF

*************************** 4. row ***************************

Variable_name: audit_delay_cmds

        Value:

*************************** 5. row ***************************

Variable_name: audit_delay_ms

        Value: 0

*************************** 6. row ***************************

Variable_name: audit_force_record_logins

        Value: OFF

*************************** 7. row ***************************

Variable_name: audit_header_msg

        Value: ON

*************************** 8. row ***************************

Variable_name: audit_json_file

        Value: ON

*************************** 9. row ***************************

Variable_name: audit_json_file_bufsize

        Value: 1

*************************** 10. row ***************************

Variable_name: audit_json_file_flush

        Value: OFF

*************************** 11. row ***************************

Variable_name: audit_json_file_retry

        Value: 60

*************************** 12. row ***************************

Variable_name: audit_json_file_sync

        Value: 0

*************************** 13. row ***************************

Variable_name: audit_json_log_file

        Value: mysql-audit.json

*************************** 14. row ***************************

Variable_name: audit_json_socket

        Value: OFF

*************************** 15. row ***************************

Variable_name: audit_json_socket_name

        Value: /var/run/db-audit/mysql.audit__var_lib_mysql_3306

*************************** 16. row ***************************

Variable_name: audit_json_socket_retry

        Value: 10

*************************** 17. row ***************************

Variable_name: audit_json_socket_write_timeout

        Value: 1000

*************************** 18. row ***************************

Variable_name: audit_offsets

        Value:

*************************** 19. row ***************************

Variable_name: audit_offsets_by_version

        Value: ON

*************************** 20. row ***************************

Variable_name: audit_password_masking_cmds

        Value: CREATE_USER,GRANT,SET_OPTION,SLAVE_START,CREATE_SERVER,ALTER_SERVER,CHANGE_MASTER,UPDATE

*************************** 21. row ***************************

Variable_name: audit_password_masking_regex

        Value: identified(?:/\*.*?\*/|\s)*?by(?:/\*.*?\*/|\s)*?(?:password)?(?:/\*.*?\*/|\s)*?['|"](?<psw>.*?)(?<!\\)['|"]|password(?:/\*.*?\*/|\s)*?\((?:/\*.*?\*/|\s)*?['|"](?<psw>.*?)(?<!\\)['|"](?:/\*.*?\*/|\s)*?\)|password(?:/\*.*?\*/|\s)*?(?:for(?:/\*.*?\*/|\s)*?\S+?)?(?:/\*.*?\*/|\s)*?=(?:/\*.*?\*/|\s)*?['|"](?<psw>.*?)(?<!\\)['|"]|password(?:/\*.*?\*/|\s)*?['|"](?<psw>.*?)(?<!\\)['|"]

*************************** 22. row ***************************

Variable_name: audit_record_cmds

        Value:

*************************** 23. row ***************************

Variable_name: audit_record_objs

        Value:

*************************** 24. row ***************************

Variable_name: audit_socket_creds

        Value: ON

*************************** 25. row ***************************

Variable_name: audit_uninstall_plugin

        Value: OFF

*************************** 26. row ***************************

Variable_name: audit_validate_checksum

        Value: ON

*************************** 27. row ***************************

Variable_name: audit_validate_offsets_extended

        Value: ON

*************************** 28. row ***************************

Variable_name: audit_whitelist_cmds

        Value: BEGIN,COMMIT,PING

*************************** 29. row ***************************

Variable_name: audit_whitelist_users

        Value:

29 rows in set (0.00 sec)

常用的参数:

1)audit_json_file

是否开启audit功能。

2)audit_json_log_file

日志文件路径。默认会在mysql data目录下,查看Data目录路径:

mysql> show variables like ‘datadir’;

+—————+—————–+

| Variable_name | Value           |

+—————+—————–+

| datadir       | /var/lib/mysql/ |

+—————+—————–+

1 row in set (0.00 sec)

3)audit_record_cmds

记录的操作类型,默认为记录所有命令。

mysql> set global audit_record_cmds=’select,insert,update,delete’;

Query OK, 0 rows affected (0.00 sec)

修改为默认

mysql> set global audit_record_cmds = NULL;

Query OK, 0 rows affected (0.00 sec)

4) audit_record_objs

audit记录操作的对象,默认为所有。

mysql> set global audit_record_objs = ‘test.*’;

Query OK, 0 rows affected (0.00 sec)

修改为默认

mysql> set global audit_record_objs = NULL;

Query OK, 0 rows affected (0.00 sec)

5) audit_whitelist_users

用户白名单。

 

这里测试一下,Mysql执行show databases;,日志如下:

{"msg-type":"activity","date":"1502854234067","thread-id":"2","query-id":"15","user":"root","priv_user":"root","host":"localhost","pid":"14373","os_user":"root","appname":"mysql","rows":"5","cmd":"show_databases","objects":[{"db":"information_schema","name":"/tmp/#sql_37a1_0","obj_type":"TABLE"}],"query":"show databases"}

 

0x03 流量镜像


镜像DB交换机接口双向流量,不影响当前网络架构,这里我们介绍两种方式。

1、Packetbeat

Packetbeat通过嗅探应用服务器之间的网络通讯,来解码应用层协议类型如HTTP、MySQL、redis等等,关联请求与响应,并记录每个事务有意义的字段。我们可以部署在Mysql Server上,也可以部署在流量镜像服务器。部署方式如下:

yum -y install libpcap

./packetbeat -c packetbeat.yml

packetbeat.yml为配置文件

packetbeat.template.json为mapping文件

测试:

mysql> select host,user from mysql.user;

+-----------+------+

| host      | user |

+-----------+------+

| %         | root |

| %         | soc  |

| 127.0.0.1 | root |

| localhost |      |

| localhost | soc  |

+-----------+------+

5 rows in set (0.00 sec)

输出到Elasticsearch内容如下:

query:执行的SQL语句

num_fields:返回的字段数

num_rows:查询结果行数

 

2、MySQL Sniffer

MySQL Sniffer 是一个基于 MySQL 协议的抓包工具,实时抓取 MySQLServer 端或 Client 端请求,并格式化输出。输出内容包括访问时间、访问用户、来源 IP、访问 Database、命令耗时、返回数据行数、执行语句等。有批量抓取多个端口,后台运行,日志分割等多种使用方式,操作便捷,输出友好。

安装依赖:

yum install glib2-devel libpcap-devel libnet-devel

项目下载地址:

https://github.com/Qihoo360/mysql-sniffer

安装步骤:

cd mysql-sniffer

mkdir proj

cd proj

cmake ../

make

cd bin/

参数如下:

[root@server120 bin]# ./mysql-sniffer -h

Usage ./mysql-sniffer [-d] -i eth0 -p 3306,3307,3308 -l /var/log/mysql-sniffer/ -e stderr

         [-d] -i eth0 -r 3000-4000

         -d daemon mode.

         -s how often to split the log file(minute, eg. 1440). if less than 0, split log everyday

         -i interface. Default to eth0

         -p port, default to 3306. Multiple ports should be splited by ','. eg. 3306,3307

            this option has no effect when -f is set.

         -r port range, Don't use -r and -p at the same time

         -l query log DIRECTORY. Make sure that the directory is accessible. Default to stdout.

         -e error log FILENAME or 'stderr'. if set to /dev/null, runtime error will not be recorded

         -f filename. use pcap file instead capturing the network interface

         -w white list. dont capture the port. Multiple ports should be splited by ','.

         -t truncation length. truncate long query if it's longer than specified length. Less than 0 means no truncation

         -n keeping tcp stream count, if not set, default is 65536. if active tcp count is larger than the specified count, mysql-sniffer will remove the oldest one

测试:

[root@server120 bin]# ./mysql-sniffer -i lo -p 3306

2017-08-16 13:56:04  root     127.0.0.1     NULL              0ms              1  select @@version_comment limit 1

2017-08-16 14:01:56  root     127.0.0.1     NULL              0ms             1  SELECT DATABASE()

2017-08-16 14:01:56  root     127.0.0.1     mysql            0ms              0  use mysql

2017-08-16 14:01:56  root     127.0.0.1     mysql            0ms              5  show databases

2017-08-16 14:01:56  root     127.0.0.1     mysql            0ms            23 show tables

2017-08-16 14:02:04  root     127.0.0.1     mysql            0ms              8  select * from user

输出格式为:时间,访问用户,来源 IP,访问 Database,命令耗时,返回数据行数,执行语句。

保存日志可以用filebeat采集:

[root@server120 bin]# ./mysql-sniffer -i eth0 -p 3306 -l /tmp/

[root@server120 tmp]# head -n 5 3306.log

2017-08-16 14:04:58  root     192.168.190.201       NULL              0ms              0  SET NAMES utf8

2017-08-16 14:04:58  root     192.168.190.201       NULL              0ms              2  SHOW VARIABLES LIKE 'lower_case_%'

2017-08-16 14:04:58  root     192.168.190.201       NULL              0ms              1  SHOW VARIABLES LIKE 'profiling'

2017-08-16 14:04:58  root     192.168.190.201       NULL              0ms              5  SHOW DATABASES

2017-08-16 14:05:20  root     192.168.190.201       NULL              0ms              0  SET NAMES utf8

-l 指定日志输出路径,日志文件将以 port.log 命名。

需要注意的是:

只能抓取新建的链接,如果是之前创建的链接将获取不到用户名和库名,并有一定几率丢包。

PS:

同事在使用DVWA测试的时候发现抓不到3306的包,原来是因为DVWA配置的数据库源是localhost,其实localhost和127.0.0.1不一样。

localhot不经网卡传输,它不受网络防火墙和网卡相关的的限制。

127.0.0.1是通过网卡传输,依赖网卡,并受到网络防火墙和网卡相关的限制。

 

0x04 拖库检测


根据SQL注入的特征,我们可以从以下角度分析:

1)QPS基线,例如通常凌晨的业务量小,QPS会比较低,而如果日志数突然增多,那么很有可能存在拖库行为。

2)特征匹配,这里就有些类似于WAF,例如通常Mysql注入会查询Information_schema,SQLMAP托库语句使用大量IFNULL、ORD、MID、CAST函数,时间盲注中用到的sleep和benchmark且命令耗时长,报错注入中用到的floor和updatexml等等。日志使用Logstash采集的时候,命中相关规则则打标,使用ElastALert监控单位时间内异常日志数量超过阈值则告警。

当然这是通用做法,如果对业务熟悉之后可以添加更细致准确的检测规则。