标签归档:审计

使用lshell构建受限的shell环境

Lshell用于构建一个受限的Shell环境,由Python编写,其功能如下:
1)限制可以使用的命令
2)记录用户执行的命令
3)限制可以访问的目录
4)限制环境变量
…..

安装步骤:
下载源码https://github.com/ghantoos/lshell
python setup.py install –no-compile –install-scripts=/usr/bin/
cp /etc/lshell.conf /usr/local/etc/

配置:
默认的配置文件中有Default配置,另外有可以创建[username]或者[grp:groupname]来定制用户的个性配置。
配置的优先级如下:
a.User配置
b.Group配置
c.Default配置

修改用户的shell环境:
chsh -s /usr/bin/lshell user_name

配置文件参数:

# lshell.py configuration file
#
# $Id: lshell.conf,v 1.27 2010-10-18 19:05:17 ghantoos Exp $

[global]
##  log directory (default /var/log/lshell/ )
logpath         : /var/log/lshell/
##  set log level to 0, 1, 2, 3 or 4  (0: no logs, 1: least verbose,
##                                                 4: log all commands)
loglevel        : 2
##  configure log file name (default is %u i.e. username.log)
#logfilename     : %y%m%d-%u
#logfilename     : syslog

##  in case you are using syslog, you can choose your logname
#syslogname      : myapp

##  Set path to sudo noexec library. This path is usually autodetected, only
##  set this variable to use alternate path. If set and the shared object is
##  not found, lshell will exit immediately. Otherwise, please check your logs
##  to verify that a standard path is detected.
##
##  while this should not be a common practice, setting this variable to an empty
##  string will disable LD_PRELOAD prepend of the commands. This is done at your
##  own risk, as lshell becomes easily breached using some commands like find(1)
##  using the -exec flag.
#path_noexec     : /usr/libexec/sudo_noexec.so

## include a directory containing multiple configuration files. These files
## can only contain default/user/group configuration. The global configuration will
## only be loaded from the default configuration file.
## e.g. splitting users into separate files
#include_dir     : /etc/lshell.d/*.conf

[default]
##  a list of the allowed commands without execution privileges or 'all' to
##  allow all commands in user's PATH
##
##  if  sudo(8) is installed and sudo_noexec.so is available, it will be loaded
##  before running every command, preventing it from  running  further  commands
##  itself. If not available, beware of commands like vim/find/more/etc. that
##  will allow users to execute code (e.g. /bin/sh) from within the application,
##  thus easily escaping lshell. See variable 'path_noexec' to use an alternative
##  path to library.
allowed         : ['ls', 'echo','ll']

##  A list of the allowed commands that are permitted to execute other
##  programs (e.g. shell scripts with exec(3)). Setting this variable to 'all'
##  is NOT allowed. Warning do not put here any command that can execute
##  arbitrary commands (e.g. find, vim, xargs)
##
##  Important: commands defined in 'allowed_shell_escape' override their
##  definition in the 'allowed' variable
#allowed_shell_escape        : ['man','zcat']

##  a list of forbidden character or commands
forbidden       : [';', '&', '|','`','>','<', '$(', '${']

##  a list of allowed command to use with sudo(8)
##  if set to ´all', all the 'allowed' commands will be accessible through sudo(8)
#sudo_commands   : ['ls', 'more']

##  number of warnings when user enters a forbidden value before getting 
##  exited from lshell, set to -1 to disable.
warning_counter : 2

##  command aliases list (similar to bash’s alias directive)
aliases         : {'ll':'ls -l'}

##  introduction text to print (when entering lshell)
#intro           : "== My personal intro ==\nWelcome to lshell\nType '?' or 'help' to get the list of allowed commands"

##  configure your promt using %u or %h (default: username)
#prompt          : "%u@%h"

##  set sort prompt current directory update (default: 0)
#prompt_short    : 0

##  a value in seconds for the session timer
#timer           : 5

##  list of path to restrict the user "geographicaly"
##  warning: many commands like vi and less allow to break this restriction
#path            : ['/home/bla/','/etc']

##  set the home folder of your user. If not specified the home_path is set to 
##  the $HOME environment variable
#home_path       : '/home/bla/'

##  update the environment variable $PATH of the user
#env_path        : ':/usr/local/bin:/usr/sbin'

##  a list of path; all executable files inside these path will be allowed 
#allowed_cmd_path: ['/home/bla/bin','/home/bla/stuff/libexec']

##  add environment variables
#env_vars        : {'foo':1, 'bar':'helloworld'}

##  allow or forbid the use of scp (set to 1 or 0)
#scp             : 1

## forbid scp upload
#scp_upload       : 0

## forbid scp download
#scp_download     : 0

##  allow of forbid the use of sftp (set to 1 or 0)
##  this option will not work if you are using OpenSSH's internal-sftp service
#sftp            : 1

##  list of command allowed to execute over ssh (e.g. rsync, rdiff-backup, etc.)
#overssh         : ['ls', 'rsync']

##  logging strictness. If set to 1, any unknown command is considered as 
##  forbidden, and user's warning counter is decreased. If set to 0, command is
##  considered as unknown, and user is only warned (i.e. *** unknown synthax)
strict          : 0

##  force files sent through scp to a specific directory
#scpforce        : '/home/bla/uploads/'

##  Enable support for WinSCP with scp mode (NOT sftp)
##  When enabled, the following parameters will be overridden:
##    - scp_upload: 1 (uses scp(1) from within session)
##    - scp_download: 1 (uses scp(1) from within session)
##    - scpforce - Ignore (uses scp(1) from within session)
##    - forbidden: -[';']
##    - allowed: +['scp', 'env', 'pwd', 'groups', 'unset', 'unalias']
#winscp: 0

##  history file maximum size 
#history_size     : 100

##  set history file name (default is /home/%u/.lhistory)
#history_file     : "/home/%u/.lshell_history"

##  define the script to run at user login
#login_script     : "/path/to/myscript.sh"

## disable user exit, this could be useful when lshell is spawned from another
## none-restricted shell (e.g. bash)
#disable_exit      : 0

配置例子:
foo:
1)可以访问/usr和/var,无法访问/usr/local
2)可以执行除了su以外的命令
3)家目录/home/users

bar:
1)可以访问/usr和/etc,无法访问/usr/local
2)除了default的命令还可以执行ping,无法执行ls
3)开启strict(1:表示每个unknown命令都会减少warning counter的数量;0:针对unknown命令只是提醒,不会减少warning counter的数量)

配置文件如下:

# CONFIGURATION START
[global]
logpath         : /var/log/lshell/
loglevel        : 2

[default]
allowed         : ['ls','pwd']
forbidden       : [';', '&', '|'] 
warning_counter : 2
timer           : 0
path            : ['/etc', '/usr']
env_path        : ':/sbin:/usr/foo'
scp             : 1 # or 0
sftp            : 1 # or 0
overssh         : ['rsync','ls']
aliases         : {'ls':'ls --color=auto','ll':'ls -l'}

[grp:users]
warning_counter : 5
overssh         : - ['ls']

[foo]
allowed         : 'all' - ['su']
path            : ['/var', '/usr'] - ['/usr/local']
home_path       : '/home/users'

[bar]
allowed         : + ['ping'] - ['ls'] 
path            : - ['/usr/local']
strict          : 1
scpforce        : '/home/bar/uploads/'
# CONFIGURATION END

Mssql数据库审计方案

SQL Server 2008R2版本实现方案:
1)数据库事务日志
在SQL Server数据库事务日志中,记录了每个事务的数据变更操作的详细信息。
2)Audit
之前做过的数据库蜜罐,针对单表监控增删改查行为并告警,就是使用的audit。
文章链接:

MSSQL数据库蜜罐测试


3)SQL Profiler
SQL Profiler是微软从SQL Server 2000开始引入的数据库引擎跟踪工具,具有使用界面操作的接口、使用SQL语句创建接口以及使用SMO编程创建接口。
4)Extended Event
Extended Event更加轻量级,性能消耗比SQL Profiler大幅降低,因此对用户系统性能影响也大幅减轻。对系统性能和吞吐量影响均在0.01%左右。

综合考虑可靠性、可维护性、系统开销和影响来看,使用Extended Event实现审计日志的方法是最优的选择。所以这里我们采用最优的方案Extended Event。

创建Extended Event Session并启用:

USE master
GO

CREATE EVENT SESSION [svrXEvent_User_Define_Testing] ON SERVER 
ADD EVENT sqlserver.sql_statement_completed
( 
	ACTION 
	( 
		sqlserver.database_id,
		sqlserver.session_id, 
		sqlserver.username, 
		sqlserver.client_hostname,
		sqlserver.client_app_name,
		sqlserver.sql_text, 
		sqlserver.plan_handle,
		sqlserver.tsql_stack,
		sqlserver.is_system,
		package0.collect_system_time
	) 
	WHERE sqlserver.username <> N'NT AUTHORITY\SYSTEM'
		AND sqlserver.username <> 'sa'
		AND sqlserver.is_system = 0		
)
ADD TARGET package0.asynchronous_file_target
( 
	SET 
		FILENAME = N'C:\Temp\svrXEvent_User_Define_Testing.xel', 
		METADATAFILE = N'C:\Temp\svrXEvent_User_Define_Testing.xem',
		MAX_FILE_SIZE = 10,
		MAX_ROLLOVER_FILES = 500
)
WITH (
	EVENT_RETENTION_MODE = NO_EVENT_LOSS,
	MAX_DISPATCH_LATENCY = 5 SECONDS,
    STARTUP_STATE=ON
);
GO


-- We need to enable event session to capture event and event data 
ALTER EVENT SESSION [svrXEvent_User_Define_Testing]
ON SERVER STATE = START;
GO

扩展事件创建完毕并启动以后,发生在SQL Server数据库服务端的所有sql_statement_completed事件信息都会被扩展事件异步滚动记录在日志文件svrXEvent_User_Define_Testing.xel文件中

每个事件以XML格式单行写入日志文件,因此我们可以采用SQL Server提供的动态管理函数sys.fn_xe_file_target_read_file来读取和分析日志文件。
SQL Server DMF sys.fn_xe_file_target_read_file ( path, mdpath, initial_file_name, initial_offset )
以下是使用DMF全量读取所有审计日志文件记录的例子:

USE master
GO

SELECT *
FROM sys.fn_xe_file_target_read_file('C:\Temp\svrXEvent_User_Define_Testing*.xel', 
		'C:\Temp\svrXEvent_User_Define_Testing*.xem', null, null)

结果如下:

详细的日志信息在Event_data字段中,使用下面的查询语句获取更为详细的信息:

-- This is SQL 2008R2
;WITH events_cte
AS (
	SELECT
		[event_data] = T.C.query('.'),
		[event_name] = T.C.value('(event/@name)[1]','varchar(100)'),
		[event_time] = DATEADD(mi, DATEDIFF(mi, GETUTCDATE(), CURRENT_TIMESTAMP),T.C.value('(event/@timestamp)[1]','datetime2')),
		[client app name] = T.C.value('(event/action[@name="client_app_name"]/value/text())[1]', 'sysname'),
		[client host name] = T.C.value('(event/action[@name="client_hostname"]/value/text())[1]', 'sysname'),
		[database_id]= T.C.value('(event/action[@name="database_id"]/value/text())[1]', 'int'),
		[cpu time (ms)] = T.C.value('(event/data[@name="cpu"]/value/text())[1]', 'bigint'),
		[logical reads] = T.C.value('(event/data[@name="reads"]/value/text())[1]', 'bigint'),
		[logical writes] = T.C.value('(event/data[@name="writes"]/value/text())[1]', 'bigint'),
		[duration (ms)] = T.C.value('(event/data[@name="duration"]/value/text())[1]', 'bigint'),
		[row count] = T.C.value('(event/data[@name="row_count"]/value/text())[1]', 'bigint'),
		[sql_text] = T.C.value('(event/action[@name="sql_text"]/value/text())[1]','nvarchar(max)'),
		[session_id] = T.C.value('(event/action[@name="session_id"]/value/text())[1]','int'),
		[user_name] = T.C.value('(event/action[@name="username"]/value/text())[1]','sysname'),
		[is_system] = T.C.value('(event/action[@name="is_system"]/value/text())[1]','sysname'),
		[query_timestamp] = T.C.value('(event/action[@name="collect_system_time"]/value/text())[1]','bigint'),
		[query_time] = DATEADD(mi, DATEDIFF(mi, GETUTCDATE(), CURRENT_TIMESTAMP),T.C.value('(event/action[@name="collect_system_time"]/text/text())[1]','datetime2'))
	FROM sys.fn_xe_file_target_read_file('C:\Temp\svrXEvent_User_Define_Testing*.xel', 
		'C:\Temp\svrXEvent_User_Define_Testing*.xem', null, null)
		CROSS APPLY (SELECT CAST(event_data as XML) AS event_data) as T(C)
)
SELECT 
	
	cte.session_id,
	--cte.query_timestamp,
	--cte.[event_time],
	cte.[query_time],
	--cte.[event_name],
	cte.user_name,
	[database_name] = db.name,
	cte.[database_id],
	cte.[client host name],
	
	cte.[logical reads],
	cte.[logical writes],
	cte.[cpu time (ms)],
	cte.[duration (ms)],
	--cte.[plan_handle],
	cte.sql_text,
	sql_text_hash = CHECKSUM(cte.sql_text),
	cte.[client app name],
	cte.[event_data],
	cte.is_system
FROM events_cte as cte
	LEFT JOIN sys.databases as db
	on cte.database_id = db.database_id
ORDER BY [query_time] ASC
;

 

结果如下:

从这个结果集中,我们可以很清楚的知道每一条SQL语句执行的详细情况,包括:用户名、执行时间点、客户机名、逻辑读、逻辑写、CPU消耗、执行时间消耗、查询语句详情等非常重要的信息。
另外也可以传入initial_file_name和initial_offset来实现从某个日志文件的特定offset(文件内容偏移量)开始读取。

 

参考文章:
http://mysql.taobao.org/monthly/2017/06/06/
http://mysql.taobao.org/monthly/2017/07/06/
http://mysql.taobao.org/monthly/2017/08/08/
https://github.com/elastic/beats/issues/149

【企业安全实战】数据库审计部署实践

0x01 概述


安全的核心是数据,数据库安全也是企业安全中很重要的一点,当然数据库安全涉及到很多方面,又衍生出很多安全产品,例如数据库审计、数据库防火墙、数据库加密、数据库脱敏等,本文主要阐述企业内部Mysql DB审计记录SQL执行的实现。按照部署方式分为以下几种:

1)流量镜像

旁路部署,透明部署,不影响网络拓扑,也不会造成额外的性能消耗。

2)DB Proxy

很多公司都有Mysql中间件,用于读写分离、负载均衡、监控等等。

3)DB Server部署Agent

在每台DB Server上安装Agent获取DB流量。

4)DB审计插件

安装插件记录增删改查语句,然后采集生成日志。

这里我们介绍一下比较流行的分析工具和插件。

 

0x02 DB审计插件


1、Mariadb Audit插件

从Mariadb 10.0版本开始audit插件直接内嵌了,名称为server_audit.so,可以直接加载使用。配置过程如下:

配置yum 数据源:

cd /etc/yum.repos.d/ 

vim /etc/yum.repos.d/MariaDB.repo

写入以下内容:

# MariaDB 10.0 CentOS repository list - created 2015-08-12 10:59 UTC 

# http://mariadb.org/mariadb/repositories/ 

[mariadb] 

name = MariaDB 

baseurl = http://yum.mariadb.org/10.0/centos6-amd64 

gpgkey=https://yum.mariadb.org/RPM-GPG-KEY-MariaDB 

gpgcheck=1

安装数据库:

yum -y install MariaDB-server MariaDB-client

启动数据库:

service mysql start

设置root密码:

mysqladmin -u root -p password 'hehe123'

安装审计插件:

MariaDB [(none)]> install plugin server_audit soname 'server_audit.so';

Query OK, 0 rows affected (0.00 sec)

MariaDB [(none)]> show variables like '%audit%';

+-------------------------------+-----------------------+

| Variable_name                 | Value                 |

+-------------------------------+-----------------------+

| server_audit_events           |                       |

| server_audit_excl_users       |                       |

| server_audit_file_path        | server_audit.log      |

| server_audit_file_rotate_now  | OFF                   |

| server_audit_file_rotate_size | 1000000               |

| server_audit_file_rotations   | 9                     |

| server_audit_incl_users       |                       |

| server_audit_logging          | OFF                   |

| server_audit_mode             | 0                     |

| server_audit_output_type      | file                  |

| server_audit_query_log_limit  | 1024                  |

| server_audit_syslog_facility  | LOG_USER              |

| server_audit_syslog_ident     | mysql-server_auditing |

| server_audit_syslog_info      |                       |

| server_audit_syslog_priority  | LOG_INFO              |

+-------------------------------+-----------------------+

15 rows in set (0.00 sec)

参数说明:

server_audit_output_type:指定日志输出类型,可为SYSLOG或FILE

server_audit_logging:启动或关闭审计

server_audit_events:指定记录事件的类型,可以用逗号分隔的多个值(connect,query,table),如果开启了查询缓存(query cache),查询直接从查询缓存返回数据,将没有table记录

server_audit_file_path:如server_audit_output_type为FILE,使用该变量设置存储日志的文件,可以指定目录,默认存放在数据目录的server_audit.log文件中

server_audit_file_rotate_size:限制日志文件的大小

server_audit_file_rotations:指定日志文件的数量,如果为0日志将从不轮转

server_audit_file_rotate_now:强制日志文件轮转

server_audit_incl_users:指定哪些用户的活动将记录,connect将不受此变量影响,该变量比server_audit_excl_users 优先级高

server_audit_syslog_facility:默认为LOG_USER,指定facility

server_audit_syslog_ident:设置ident,作为每个syslog记录的一部分

server_audit_syslog_info:指定的info字符串将添加到syslog记录

server_audit_syslog_priority:定义记录日志的syslogd priority

server_audit_excl_users:该列表的用户行为将不记录,connect将不受该设置影响

server_audit_mode:标识版本,用于开发测试

 

vim /etc/my.cnf.d/server.cnf

在[server]下添加:

server_audit_events='CONNECT,QUERY,TABLE'

server_audit_logging=ON     

server_audit_file_rotate_size = 10G   

server_audit_file_path='/tmp/server_audit.log'

执行数据库操作,审计到的内容如下:

20170307 08:39:55,kafka112,root,localhost,3,67,READ,test,test,

20170307 08:39:55,kafka112,root,localhost,3,67,QUERY,test,'select * from test',0

20170307 08:40:29,kafka112,root,localhost,3,0,DISCONNECT,test,,0

20170307 08:41:06,kafka112,root,localhost,4,0,FAILED_CONNECT,,,1045

20170307 08:41:06,kafka112,root,localhost,4,0,DISCONNECT,,,0

20170307 08:41:11,kafka112,root,localhost,5,0,CONNECT,,,0

20170307 08:41:11,kafka112,root,localhost,5,69,QUERY,,'select @@version_comment limit 1',0

20170307 08:41:22,kafka112,root,localhost,5,70,QUERY,,'show variables like \'%audit%\'',0

2Mysql audit plugin

Mysql audit plugin是Mcafee开源的Mysql审计工具,支持版本为MySQL (5.1, 5.5, 5.6, 5.7),MariaDB (5.5, 10.0, 10.1) ,Platform (32 or 64 bit)。

插件地址如下:

https://bintray.com/mcafee/mysql-audit-plugin/release

根据Mysql版本下载配对的版本插件。

https://bintray.com/mcafee/mysql-audit-plugin/release/1.1.4-725?versionPath=%2Fmcafee%2Fmysql-audit-plugin%2Frelease%2F1.1.4-725#files

下载解压后,在Lib目录下找到libaudit_plugin.so

查看plugin目录

mysql> show variables like '%plugin%';

+---------------+-------------------------+

| Variable_name | Value                   |

+---------------+-------------------------+

| plugin_dir    | /usr/lib64/mysql/plugin |

+---------------+-------------------------+

1 row in set (0.00 sec)

将libaudit_plugin.so复制到Plugin_dir下

安装插件:

mysql> INSTALL PLUGIN AUDIT SONAME 'libaudit_plugin.so';

Query OK, 0 rows affected (0.35 sec)

查看版本:

mysql> show global status like '%audit%';

+------------------------+-----------+

| Variable_name          | Value     |

+------------------------+-----------+

| Audit_protocol_version | 1.0       |

| Audit_version          | 1.1.4-725 |

+------------------------+-----------+

2 rows in set (0.01 sec)

开启审计功能

mysql> set global audit_json_file = ON;

Query OK, 0 rows affected (0.00 sec)

查看配置参数

mysql> show global variables like '%audit%'\G

*************************** 1. row ***************************

Variable_name: audit_before_after

        Value: after

*************************** 2. row ***************************

Variable_name: audit_checksum

        Value:

*************************** 3. row ***************************

Variable_name: audit_client_capabilities

        Value: OFF

*************************** 4. row ***************************

Variable_name: audit_delay_cmds

        Value:

*************************** 5. row ***************************

Variable_name: audit_delay_ms

        Value: 0

*************************** 6. row ***************************

Variable_name: audit_force_record_logins

        Value: OFF

*************************** 7. row ***************************

Variable_name: audit_header_msg

        Value: ON

*************************** 8. row ***************************

Variable_name: audit_json_file

        Value: ON

*************************** 9. row ***************************

Variable_name: audit_json_file_bufsize

        Value: 1

*************************** 10. row ***************************

Variable_name: audit_json_file_flush

        Value: OFF

*************************** 11. row ***************************

Variable_name: audit_json_file_retry

        Value: 60

*************************** 12. row ***************************

Variable_name: audit_json_file_sync

        Value: 0

*************************** 13. row ***************************

Variable_name: audit_json_log_file

        Value: mysql-audit.json

*************************** 14. row ***************************

Variable_name: audit_json_socket

        Value: OFF

*************************** 15. row ***************************

Variable_name: audit_json_socket_name

        Value: /var/run/db-audit/mysql.audit__var_lib_mysql_3306

*************************** 16. row ***************************

Variable_name: audit_json_socket_retry

        Value: 10

*************************** 17. row ***************************

Variable_name: audit_json_socket_write_timeout

        Value: 1000

*************************** 18. row ***************************

Variable_name: audit_offsets

        Value:

*************************** 19. row ***************************

Variable_name: audit_offsets_by_version

        Value: ON

*************************** 20. row ***************************

Variable_name: audit_password_masking_cmds

        Value: CREATE_USER,GRANT,SET_OPTION,SLAVE_START,CREATE_SERVER,ALTER_SERVER,CHANGE_MASTER,UPDATE

*************************** 21. row ***************************

Variable_name: audit_password_masking_regex

        Value: identified(?:/\*.*?\*/|\s)*?by(?:/\*.*?\*/|\s)*?(?:password)?(?:/\*.*?\*/|\s)*?['|"](?<psw>.*?)(?<!\\)['|"]|password(?:/\*.*?\*/|\s)*?\((?:/\*.*?\*/|\s)*?['|"](?<psw>.*?)(?<!\\)['|"](?:/\*.*?\*/|\s)*?\)|password(?:/\*.*?\*/|\s)*?(?:for(?:/\*.*?\*/|\s)*?\S+?)?(?:/\*.*?\*/|\s)*?=(?:/\*.*?\*/|\s)*?['|"](?<psw>.*?)(?<!\\)['|"]|password(?:/\*.*?\*/|\s)*?['|"](?<psw>.*?)(?<!\\)['|"]

*************************** 22. row ***************************

Variable_name: audit_record_cmds

        Value:

*************************** 23. row ***************************

Variable_name: audit_record_objs

        Value:

*************************** 24. row ***************************

Variable_name: audit_socket_creds

        Value: ON

*************************** 25. row ***************************

Variable_name: audit_uninstall_plugin

        Value: OFF

*************************** 26. row ***************************

Variable_name: audit_validate_checksum

        Value: ON

*************************** 27. row ***************************

Variable_name: audit_validate_offsets_extended

        Value: ON

*************************** 28. row ***************************

Variable_name: audit_whitelist_cmds

        Value: BEGIN,COMMIT,PING

*************************** 29. row ***************************

Variable_name: audit_whitelist_users

        Value:

29 rows in set (0.00 sec)

常用的参数:

1)audit_json_file

是否开启audit功能。

2)audit_json_log_file

日志文件路径。默认会在mysql data目录下,查看Data目录路径:

mysql> show variables like ‘datadir’;

+—————+—————–+

| Variable_name | Value           |

+—————+—————–+

| datadir       | /var/lib/mysql/ |

+—————+—————–+

1 row in set (0.00 sec)

3)audit_record_cmds

记录的操作类型,默认为记录所有命令。

mysql> set global audit_record_cmds=’select,insert,update,delete’;

Query OK, 0 rows affected (0.00 sec)

修改为默认

mysql> set global audit_record_cmds = NULL;

Query OK, 0 rows affected (0.00 sec)

4) audit_record_objs

audit记录操作的对象,默认为所有。

mysql> set global audit_record_objs = ‘test.*’;

Query OK, 0 rows affected (0.00 sec)

修改为默认

mysql> set global audit_record_objs = NULL;

Query OK, 0 rows affected (0.00 sec)

5) audit_whitelist_users

用户白名单。

 

这里测试一下,Mysql执行show databases;,日志如下:

{"msg-type":"activity","date":"1502854234067","thread-id":"2","query-id":"15","user":"root","priv_user":"root","host":"localhost","pid":"14373","os_user":"root","appname":"mysql","rows":"5","cmd":"show_databases","objects":[{"db":"information_schema","name":"/tmp/#sql_37a1_0","obj_type":"TABLE"}],"query":"show databases"}

 

0x03 流量镜像


镜像DB交换机接口双向流量,不影响当前网络架构,这里我们介绍两种方式。

1、Packetbeat

Packetbeat通过嗅探应用服务器之间的网络通讯,来解码应用层协议类型如HTTP、MySQL、redis等等,关联请求与响应,并记录每个事务有意义的字段。我们可以部署在Mysql Server上,也可以部署在流量镜像服务器。部署方式如下:

yum -y install libpcap

./packetbeat -c packetbeat.yml

packetbeat.yml为配置文件

packetbeat.template.json为mapping文件

测试:

mysql> select host,user from mysql.user;

+-----------+------+

| host      | user |

+-----------+------+

| %         | root |

| %         | soc  |

| 127.0.0.1 | root |

| localhost |      |

| localhost | soc  |

+-----------+------+

5 rows in set (0.00 sec)

输出到Elasticsearch内容如下:

query:执行的SQL语句

num_fields:返回的字段数

num_rows:查询结果行数

 

2、MySQL Sniffer

MySQL Sniffer 是一个基于 MySQL 协议的抓包工具,实时抓取 MySQLServer 端或 Client 端请求,并格式化输出。输出内容包括访问时间、访问用户、来源 IP、访问 Database、命令耗时、返回数据行数、执行语句等。有批量抓取多个端口,后台运行,日志分割等多种使用方式,操作便捷,输出友好。

安装依赖:

yum install glib2-devel libpcap-devel libnet-devel

项目下载地址:

https://github.com/Qihoo360/mysql-sniffer

安装步骤:

cd mysql-sniffer

mkdir proj

cd proj

cmake ../

make

cd bin/

参数如下:

[root@server120 bin]# ./mysql-sniffer -h

Usage ./mysql-sniffer [-d] -i eth0 -p 3306,3307,3308 -l /var/log/mysql-sniffer/ -e stderr

         [-d] -i eth0 -r 3000-4000

         -d daemon mode.

         -s how often to split the log file(minute, eg. 1440). if less than 0, split log everyday

         -i interface. Default to eth0

         -p port, default to 3306. Multiple ports should be splited by ','. eg. 3306,3307

            this option has no effect when -f is set.

         -r port range, Don't use -r and -p at the same time

         -l query log DIRECTORY. Make sure that the directory is accessible. Default to stdout.

         -e error log FILENAME or 'stderr'. if set to /dev/null, runtime error will not be recorded

         -f filename. use pcap file instead capturing the network interface

         -w white list. dont capture the port. Multiple ports should be splited by ','.

         -t truncation length. truncate long query if it's longer than specified length. Less than 0 means no truncation

         -n keeping tcp stream count, if not set, default is 65536. if active tcp count is larger than the specified count, mysql-sniffer will remove the oldest one

测试:

[root@server120 bin]# ./mysql-sniffer -i lo -p 3306

2017-08-16 13:56:04  root     127.0.0.1     NULL              0ms              1  select @@version_comment limit 1

2017-08-16 14:01:56  root     127.0.0.1     NULL              0ms             1  SELECT DATABASE()

2017-08-16 14:01:56  root     127.0.0.1     mysql            0ms              0  use mysql

2017-08-16 14:01:56  root     127.0.0.1     mysql            0ms              5  show databases

2017-08-16 14:01:56  root     127.0.0.1     mysql            0ms            23 show tables

2017-08-16 14:02:04  root     127.0.0.1     mysql            0ms              8  select * from user

输出格式为:时间,访问用户,来源 IP,访问 Database,命令耗时,返回数据行数,执行语句。

保存日志可以用filebeat采集:

[root@server120 bin]# ./mysql-sniffer -i eth0 -p 3306 -l /tmp/

[root@server120 tmp]# head -n 5 3306.log

2017-08-16 14:04:58  root     192.168.190.201       NULL              0ms              0  SET NAMES utf8

2017-08-16 14:04:58  root     192.168.190.201       NULL              0ms              2  SHOW VARIABLES LIKE 'lower_case_%'

2017-08-16 14:04:58  root     192.168.190.201       NULL              0ms              1  SHOW VARIABLES LIKE 'profiling'

2017-08-16 14:04:58  root     192.168.190.201       NULL              0ms              5  SHOW DATABASES

2017-08-16 14:05:20  root     192.168.190.201       NULL              0ms              0  SET NAMES utf8

-l 指定日志输出路径,日志文件将以 port.log 命名。

需要注意的是:

只能抓取新建的链接,如果是之前创建的链接将获取不到用户名和库名,并有一定几率丢包。

PS:

同事在使用DVWA测试的时候发现抓不到3306的包,原来是因为DVWA配置的数据库源是localhost,其实localhost和127.0.0.1不一样。

localhot不经网卡传输,它不受网络防火墙和网卡相关的的限制。

127.0.0.1是通过网卡传输,依赖网卡,并受到网络防火墙和网卡相关的限制。

 

0x04 托库检测


根据SQL注入的特征,我们可以从以下角度分析:

1)QPS基线,例如通常凌晨的业务量小,QPS会比较低,而如果日志数突然增多,那么很有可能存在托库行为。

2)特征匹配,这里就有些类似于WAF,例如通常Mysql注入会查询Information_schema,SQLMAP托库语句使用大量IFNULL、ORD、MID、CAST函数,时间盲注中用到的sleep和benchmark且命令耗时长,报错注入中用到的floor和updatexml等等。日志使用Logstash采集的时候,命中相关规则则打标,使用ElastALert监控单位时间内异常日志数量超过阈值则告警。

当然这是通用做法,如果对业务熟悉之后可以添加更细致准确的检测规则。

 

 

【企业安全实战】运维Bash命令审计

0x01 概述

Bash命令收集主要是用于在没有堡垒机和跳板机情况下实现运维操作集中审计,另外就是针对一些攻击者入侵服务器后反弹Bash或者sh等做监控。这里提供两种通过Syslog收集的方式。

0x02 通过修改Bash源码发送Syslog

从bash4.1 版本开始,bash开始支持Rsyslog,下载bash-4.4,下载地址: https://ftp.gnu.org/gnu/bash/

具体步骤如下:
1)
修改bashhist.c:
修改771行

syslog (SYSLOG_FACILITY|SYSLOG_LEVEL, "PID=%d UID=%d User=%s Cmd=%s", getpid(), current_user.uid, current_user.user_name, line);

修改776行

syslog (SYSLOG_FACILITY|SYSLOG_LEVEL, "PID=%d UID=%d User=%s Cmd=%s", getpid(), current_user.uid, current_user.user_name, trunc);

2)
再修改config-top.h
去掉116行/**/

#define SYSLOG_HISTORY 
#if defined (SYSLOG_HISTORY)
# define SYSLOG_FACILITY LOG_LOCAL6
# define SYSLOG_LEVEL LOG_NOTICE
# define OPENLOG_OPTS LOG_PID
#endif

syslog的FACILITY为 LOCAL6,日志级别为NOTICE
3)

./configure --prefix=/usr/local/bash && make && make install 
[root@localhost ~]# cp /bin/bash /bin/bashbak 
[root@localhost ~]# \cp -f /usr/local/bash/bin/bash /bin/bash

4)
修改rsyslog配置,这里我们先输出到本地测试一下

[root@zaojiasys_31 admin]# touch /var/log/bash.log
[root@zaojiasys_31 admin]# vim /etc/rsyslog.conf

添加local6.notice /var/log/bash.log

[root@zaojiasys_31 admin]# service rsyslog restart
[root@zaojiasys_31 admin]# tail -f /var/log/bash.log 
Jul 25 16:22:15 localhost bash[18540]: PID=18540 UID=0 User=root Cmd=tail -f /var/log/bash.log 
Jul 25 16:22:21 localhost bash[19033]: PID=19033 UID=0 User=root Cmd=whoami

5)
[root@zaojiasys_31 admin]# vim /etc/rsyslog.conf
修改*.info;mail.none;authpriv.none;cron.none;local6.none; /var/log/messages
添加local6.notice @10.110.1.33:3514

使用ELK,首先配置logstash

input {
	syslog{  
		port => "3514"  
		type => "bash"
	}  
}

filter {
    grok{
        match => {
            "message" => "PID=%{NUMBER:processid} UID=%{NUMBER:uid} User=%{WORD:user} Cmd=%{GREEDYDATA:cmd}"
        }
    }
    mutate {
        remove_field => [ "message" ]
    }
}

output {
   if "_grokparsefailure" not in [tags] {
        elasticsearch {
            hosts => "10.110.1.33:9200"
            index => "bash_%{+YYYY.MM.dd}"
        }
    }
}

Elasticsearch 添加模板

curl -XPUT 10.59.0.248:9200/_template/template_bash -d '
{
   "template": "bash_*", 
   "order" : 10,
   "settings" : {
      "number_of_shards": 5,
      "number_of_replicas": 0
   },
   "mappings" : {
    "_default_" : {
      "_all" : {"enabled" : true, "omit_norms" : true},
      "properties" : {
        "host": { "type": "keyword"},
        "cmd": { "type": "keyword"},
        "user": { "type": "keyword"},
        "uid": { "type": "integer"},
        "processid": { "type": "integer"}
      }
    }
  }
}

查看Kibana

[root@server120 ~]# ll /bin/sh
lrwxrwxrwx. 1 root root 4 3月 21 2016 /bin/sh -> bash

/bin/sh是软链到/bin/bash的,/bin/sh也可以审计到。
另外禁用其他的shell:

# chmod 750 /bin/csh
# chmod 750 /bin/tcsh
# chmod 750 /bin/dash

PS:
需要注意GLIBC的版本如果不一致的话,那么启动Bash就会有问题。

[root@localhost ~]# /bin/bash
/bin/bash: /lib64/libc.so.6: version `GLIBC_2.14' not found (required by /bin/bash)

0x03 通过Logger发送Syslog

客户端具体配置如下:

在/etc/profile中添加行:

[ -f /etc/zb_bashrc ] && . /etc/zb_bashrc

/etc/zb_bashrc的内容如下:

if [ "$BASH" ]; then

    SSH_USER=$(who -mu  |awk '{print $1}')

    if [ -z "$SSH_USER" ]; then

        SSH_USER="-"

    fi



    SSH_CLIENT_IP=$(echo $SSH_CONNECTION |awk '{print $1}')

       if [ -z "$SSH_CLIENT_IP" ]; then

              SSH_CLIENT_IP="-"

       fi



    SSH_SERVER_IP=$(echo $SSH_CONNECTION |awk '{print $3}')

       if [ -z "$SSH_SERVER_IP" ]; then

              SSH_SERVER_IP="-"

       fi



    if [ -f "$HOME/.bash_history" ]; then

        export HISTFILE="$HOME/.bash_history"

    else

        touch $HOME/.bash_history

        chmod 600 $HOME/.bash_history

        export HISTFILE="$HOME/.bash_history"

    fi

    export HISTSIZE=100

    export HISTCONTROL=""

    export HISTIGNORE=""

    export HISTCMD

    export HISTTIMEFORMAT="%Y-%m-%d %T :: "

    PROMPT_COMMAND='history -a;history 1 | xargs -0 -i logger -p local6.notice $SSH_CLIENT_IP $SSH_SERVER_IP $SSH_USER $USER{}'

    readonly PROMPT_COMMAND

    readonly HISTTIMEFORMAT

    readonly HISTSIZE

    readonly HISTFILE

    readonly HISTIGNORE

    readonly HISTCONTROL

fi

执行source /etc/profile

修改rsysLog的配置

添加local6.notice @10.211.0.107

修改*.info;mail.none;authpriv.none;cron.none;local6.none                /var/log/messages

service rsyslog restart

 

期间遇到的问题:

1、日志重复收取

如下

增加一条配置:

$RepeatedMsgReduction on

需要注意该条配置需要放到发送syslog配置条目上面才能生效。

重复的日志会提示last message repeated X times,然后在logstash加一条配置,把该条都drop掉。

if ( "last message" in [message] ) {

       drop{}

}

最后Logstash的配置文件如下:

input {

       udp { 

          port => "514" 

          workers => "10"

       } 

}

filter {

       if ("last message" in [message]) {

              drop{}

       }

    dissect {

        mapping => {

            "message" => '<%{pri}>%{} %{} %{} %{} %{}: %{client_ip} %{server_ip} %{ssh_user} %{user}  %{line}  %{timestamp} :: %{command}'

        }

    }

       date {

           match => ["timestamp", "yyyy-MM-dd HH:mm:ss"]

           target => "@timestamp"

           "locale" => "en"

           timezone => "UTC"

       }

}

output {

       redis {

              host => "127.0.0.1"

              data_type => "list"

              key => "command"

              db => "2"

       }

}

2、sh执行命令无法记录

其实/bin/sh是软链到的/bin/bash,但是不会加载配置文件。

 

0x04 SaltStack批量部署

推荐第一种,模板如下:

install_bash:

  cmd.run:

    - name: tar zxvf bash.tar.gz && cd bash && ./configure --prefix=/usr/local/bash && make && make install

    - cwd: /root/Downloads

    - unless: test -e /usr/local/bash/bin/bash

    - require:

      - file: /root/Downloads/bash.tar.gz



/root/Downloads/bash.tar.gz:

  file.managed:

    - source: salt://audit/bash.tar.gz

    - user: root

    - group: root

    - mode: 755

    - template: jinja

    - require:

      - file: /root/Downloads



bak_bash:

  cmd.run:

    - name: cp /bin/bash /bin/bashbak

    - unless: test -e /bin/bashbak

    - require:

      - cmd: install_bash



cp_bash:

  cmd.run:

    - name: \cp -f /usr/local/bash/bin/bash /bin/bash

    - require:

      - cmd: bak_bash



/etc/rsyslog.conf:

  file.append:

    - text: "local6.notice                                               @10.59.0.58:3514"

    - unless: grep '10.59.0.58' /etc/rsyslog.conf



rsyslog:

  service.running:

    - enable: True

    - restart: True

    - watch:

      - file: /etc/rsyslog.conf

参考文章:

http://os.51cto.com/art/201102/244661.htm
http://www.freebuf.com/articles/system/140543.html
http://www.freebuf.com/articles/system/135845.html

Linux下利用auditd监控JAVA执行命令并通过OSSEC告警

0x01 Auditd服务介绍
auditd服务是Linux自带的审计系统,用来记录审计信息,从安全的角度可以用于对系统安全事件的监控。
auditd服务的配置文件位于/etc/audit/audit.rules,其中每个规则和观察器必须单独在一行中。语法如下:

-a <list>,<action> <options>

<list>配置如下:

task
每个任务的列表。只有当创建任务时才使用。只有在创建时就已知的字段(比如UID)才可以用在这个列表中。
entry
系统调用条目列表。当进入系统调用确定是否应创建审计时使用。
exit
系统调用退出列表。当退出系统调用以确定是否应创建审计时使用。
user
用户消息过滤器列表。内核在将用户空间事件传递给审计守护进程之前使用这个列表过滤用户空间事件。有效的字段只有uid、auid、gid和pid。
exclude
事件类型排除过滤器列表。用于过滤管理员不想看到的事件。用msgtype字段指定您不想记录到日志中的消息。

<action>配置如下:

never
不生成审计记录。
always
分配审计上下文,总是把它填充在系统调用条目中,总是在系统调用退出时写一个审计记录。如果程序使用了这个系统调用,则开始一个审计记录。

<options>配置如下:

-S <syscall>
根据名称或数字指定一个系统。要指定所有系统调用,可使用all作为系统调用名称。
-F <name[=,!=,<,>,<=]value>
指定一个规则字段。如果为一个规则指定了多个字段,则只有所有字段都为真才能启动一个审计记录。每个规则都必须用-F启动,最多可以指定64个规则。
常用的字段如下:
pid
进程ID。
ppid
父进程的进程ID。
uid
用户ID。
gid
组ID。
msgtype
消息类型号。只应用在排除过滤器列表上。
arch
系统调用的处理器体系结构。指定精确的体系结构,比如i686(可以通过uname -m命令检索)或者指定b32来使用32位系统调用表,或指定b64来使用64位系统调用表。
...

 

0x02 编写测试Java命令监控规则
Jboss的启动账户为nobody,添加审计规则

# grep '\-a' /etc/audit/audit.rules 
-a exclude,always -F msgtype=CONFIG_CHANGE
-a exit,always -F arch=b32 -F uid=99 -S execve -k webshell

重启服务

# service auditd restart
Stopping auditd: [ OK ]
Starting auditd: [ OK ]

使用webshell测试:
1)菜刀马测试
菜刀马传递的参数为

tom=M&z0=GB2312&z1=-c/bin/sh&z2=cd /;whoami;echo [S];pwd;echo [E]

所执行的程序如下:

else if(Z.equals("M")){String[] c={z1.substring(2),z1.substring(0,2),z2};Process p=Runtime.getRuntime().exec(c);

审计日志如下:

type=EXECVE msg=audit(1500273887.809:7496): argc=3 a0="/bin/sh" a1="-c" a2=6364202F7765622F70726F6A6563742F7A616F6A69617379732E6A69616E73686539392E636F6D2E636563616F707379732F636563616F707379732F3B77686F616D693B6563686F205B535D3B7077643B6563686F205B455D

2)jspspy测试
jspspy传递的参数为

o=shell&type=command&command=netstat+-antlp&submit=Execute

所执行的程序如下:

String type = request.getParameter("type");
if (type.equals("command")) {
ins.get("vs").invoke(request,response,JSession);
out.println("<div style='margin:10px'><hr/>");
out.println("<pre>");
String command = request.getParameter("command");
if (!Util.isEmpty(command)) {
Process pro = Runtime.getRuntime().exec(command);
BufferedReader reader = new BufferedReader(new InputStreamReader(pro.getInputStream()));
String s = reader.readLine();

审计日志如下:

type=EXECVE msg=audit(1500273958.180:7500): argc=1 a0="whoami"

 

0x03 OSSEC监控配置
OSSEC本身已经包含了auditd事件的解码规则,例如:

<decoder name="auditd">
  <prematch>^type=</prematch>
</decoder>
.......

但是在RULES里面没有找到现成的规则,编辑local_rules.xml,新增

<group name="syslog,auditd,">
  <rule id="110000" level="0" noalert="1">
    <decoded_as>auditd</decoded_as>
    <description>AUDITD messages grouped.</description>
  </rule>
  <rule id="110001" level="10">
    <if_sid>110000</if_sid>
    <match>EXECVE</match>
    <description>Java execution command</description>
  </rule>
</group>

测试

[root@localhost ossec]# ./bin/ossec-logtest 
2017/07/17 16:28:26 ossec-testrule: INFO: Reading local decoder file.
2017/07/17 16:28:26 ossec-testrule: INFO: Started (pid: 9463).
ossec-testrule: Type one log per line.

type=EXECVE msg=audit(1500273958.180:7500): argc=1 a0="whoami"


**Phase 1: Completed pre-decoding.
       full event: 'type=EXECVE msg=audit(1500273958.180:7500): argc=1 a0="whoami"'
       hostname: 'localhost'
       program_name: '(null)'
       log: 'type=EXECVE msg=audit(1500273958.180:7500): argc=1 a0="whoami"'

**Phase 2: Completed decoding.
       decoder: 'auditd'

**Phase 3: Completed filtering (rules).
       Rule id: '110001'
       Level: '10'
       Description: 'Java execution command'
**Alert to be generated.

然后在Agent端添加监控文件

  <localfile>
    <log_format>syslog</log_format>
    <location>/var/log/audit/audit.log</location>
  </localfile>

然后jspspy执行系统命令,可以看到告警如下

[root@localhost ossec]# tail -f /var/ossec/logs/alerts/alerts.log 
** Alert 1500280231.400419: mail  - syslog,auditd,
2017 Jul 17 16:30:31 (agent-31) 10.110.1.31->/var/log/audit/audit.log
Rule: 110001 (level 10) -> 'Java execution command'
type=EXECVE msg=audit(1500280229.507:7665): argc=1 a0="pwd"

这里还需考虑的一个问题是白名单,例如公司的一些站点本身就会调用视频处理的一些功能,也会调用系统命令。所以为了避免误报,需要新增一个白名单功能。
这里我们修改一下local_rules.xml,新增白名单规则,并且放到EXECVE规则上面。

<group name="syslog,auditd,">
  <rule id="110000" level="0" noalert="1">
    <decoded_as>auditd</decoded_as>
    <description>AUDITD messages grouped.</description>
  </rule>
  <rule id="110001" level="0">
    <if_sid>110000</if_sid>
    <regex>whoami|passwd</regex>
    <description>Java execution white list</description>
  </rule>
  <rule id="110002" level="10">
    <if_sid>110000</if_sid>
    <match>EXECVE</match>
    <description>Java execution command</description>
  </rule>
</group>

如上所示,执行whoami和cat /etc/passwd的时候不会产生告警。

 

MSSQL数据库蜜罐测试

简介


MSSQL 2008 R2可以使用MSSQL的审计功能,在每个线上数据库中创建一张蜜罐表,使用审计功能对该表的增删改查操作做监控,MSSQL会将操作日志记录到文件中。

测试


测试使用菜刀马连接数据库,获取数据库的请求如下:

tom=N&z0=GB2312&z1=com.microsoft.sqlserver.jdbc.SQLServerDriver
jdbc:sqlserver://192.168.***.***:1433;databaseName=***;user=sa;password=***&z2=

程序如下:

else if(Z.equals("N")){NN(z1,sb);}

可以看到调用了NN函数

void NN(String s,StringBuffer sb)throws Exception{
Connection c=GC(s);
ResultSet r=c.getMetaData().getCatalogs();
while(r.next()){
sb.append(r.getString(1)+"\t");
}
r.close();
c.close();
}

getMetaData().getCatalogs()表示返回所有的数据库

双击数据库获取表的时候菜刀发送的请求包如下:

tom=O&z0=GB2312&z1=com.microsoft.sqlserver.jdbc.SQLServerDriver
jdbc:sqlserver://192.168.***.***:1433;databaseName=***;user=sa;password=***
kefu&z2=

调用OO(z1,sb);

void OO(String s,StringBuffer sb)throws Exception{
Connection c=GC(s);
String[] t={"TABLE"};
ResultSet r=c.getMetaData().getTables (null,null,"%",t);
while(r.next()){
sb.append(r.getString("TABLE_NAME")+"\t");
}r.close();c.close();}

会调用getTables函数,所执行的sql语句如下:

select
            TABLE_QUALIFIER = convert(sysname,db_name()),
            TABLE_OWNER     = convert(sysname,schema_name(o.schema_id)),
            TABLE_NAME      = convert(sysname,o.name),
            TABLE_TYPE      = convert(varchar(32),
                                        rtrim(substring('SYSTEM TABLE            TABLE       VIEW       ',
                                              (ascii(o.type)-83)*12+1,
                                              12))  -- 'S'=0,'U'=2,'V'=3
                                     ),
            REMARKS = convert(varchar(254),null)    -- Remarks are NULL.

        from
            sys.all_objects o

        where
            o.type in ('S','U','V') and
            has_perms_by_name(quotename(schema_name(o.schema_id)) + '.' + quotename(o.name),
                              'object',
                              'select') = 1 and
            charindex(substring(o.type,1,1),@type1) <> 0 and -- Only desired types.
            (@table_name  is NULL or o.name like @table_name) and
            (@table_owner is NULL or schema_name(o.schema_id) like @table_owner)
        order by 4, 1, 2, 3

has_perms_by_name(quotename(schema_name(o.schema_id)) + ‘.’ + quotename(o.name),’object’,’select’) = 1

会判断数据库是否有select权限,导致触发数据库蜜罐。

PS:
公司DBA使用一款从MSSQL导出数据到MYSQL的工具,点击浏览后同样有可能触发到该告警,产生误报。