Support Vector Machines(SVM)
langu_xyz

0x01 SVM

实现算法:序列最小优化(SMO)

支持向量:离分割超平面最近的那些点

优点:

  • 范化错误率低
  • 计算开销不大
  • 结果易解释

缺点:

  • 对参数调节和核函数的选择敏感
  • 原始分类器需要修改才能处理多分类问题

适用数据类型:

  • 数值型
  • 标称型

0x02 SMO

将大优化问题分解为多个小优化问题来求解

目标是求出一系列alpha和b,一旦求出了这些alpha,就很容易计算出权重向量w并得到分隔超平面

工作原理是:每次循环中选择两个alpha进行优化处理。一旦找到一对合适的alpha,那么就增大其中一个同时减小另一个

  • 简化版实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def selectJrand(i, m):
'''
在某个区间范围内随机选择一个整数
:param i: 第i个alpha的下标
:param m: 所有alpha的数目
:return:
'''
j=i
while (j==i):
j = int(random.uniform(0, m))
return j

def clipAlpha(aj, H, L):
'''
用于调整大于H或小于L的alpha值
:param aj:
:param H:
:param L:
:return:
'''
if aj > H:
aj = H
if L > aj:
aj = L
return aj

def smoSimple(dataMatIn, classLabels, C, toler, maxIter):
'''

:param dataMatIn:数据集
:param classLabels:类别标签
:param C:常数C
:param toler:容错率
:param maxIter:退出前的最大循环次数
:return:
'''
dataMatrix = mat(dataMatIn)
labelMat = mat(classLabels).transpose()
b = 0
m, n = shape(dataMatrix)
alphas = mat(zeros((m, 1)))
iter = 0 #记录循环次数
while (iter < maxIter):
alphaPairsChanged = 0 #用于记录alpha是否优化
for i in range(m):
fXi = float(multiply(alphas, labelMat).T*(dataMatrix*dataMatrix[i, :].T)) + b #预测的类别
Ei = fXi - float(labelMat[i])#预测和实际的误差
#如果误差过大,则进行优化
if ((labelMat[i]*Ei < -toler) and (alphas[i] < C)) or ((labelMat[i]*Ei > toler) and (alphas[i] > 0)):
j = selectJrand(i, m) #随机选择第二个alpha值
fXj = float(multiply(alphas, labelMat).T*(dataMatrix*dataMatrix[j, :].T)) + b #预测类别
Ej = fXj - float(labelMat[j]) #误差
alphaIold = alphas[i].copy()
alphaJold = alphas[j].copy()
#计算L和H
if (labelMat[i] != labelMat[j]):
L = max(0, alphas[j] - alphas[i])
H = min(C, C + alphas[j] - alphas[i])
else:
L = max(0, alphas[j] + alphas[i] - C)
H = min(C, alphas[j] + alphas[i])
if L==H:
print("L==H")
continue
eta = 2.0 * dataMatrix[i, :]*dataMatrix[j, :].T - dataMatrix[i, :]*dataMatrix[i, :].T - dataMatrix[j, :]*dataMatrix[j, :].T #最优修改量
if eta >= 0:
print("eta>=0")
continue
alphas[j] -= labelMat[j]*(Ei - Ej)/eta
alphas[j] = clipAlpha(alphas[j], H, L)
if (abs(alphas[j] - alphaJold) < 0.00001):
print("j not moving enough")
continue
alphas[i] += labelMat[j]*labelMat[i]*(alphaJold - alphas[j])#修改i,修改量和j相同,方向相反
#设置常数项
b1 = b - Ei - labelMat[i]*(alphas[i]-alphaIold)*dataMatrix[i, :]*dataMatrix[i, :].T - labelMat[j]*(alphas[j]-alphaJold)*dataMatrix[i, :]*dataMatrix[j, :].T
b2 = b - Ej - labelMat[i]*(alphas[i]-alphaIold)*dataMatrix[i, :]*dataMatrix[j, :].T - labelMat[j]*(alphas[j]-alphaJold)*dataMatrix[j, :]*dataMatrix[j, :].T
if (0 < alphas[i]) and (C > alphas[i]):
b = b1
elif (0 < alphas[j]) and (C > alphas[j]):
b = b2
else:
b = (b1 + b2)/2.0
alphaPairsChanged += 1
print("iter: %d i:%d, pairs changed %d" % (iter, i, alphaPairsChanged))
if (alphaPairsChanged == 0):
iter += 1
else:
iter = 0
print("iteration number: %d" % iter)
return b, alphas

找出哪些点是支持向量

1
2
3
for i in range(100):
if alphas[i] > 0.0:
print(dateArr[i], labelArr[i])

  • 完整版实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
class optStruct:
def __init__(self, dataMatIn, classLabels, C, toler, kTup): # Initialize the structure with the parameters
self.X = dataMatIn
self.labelMat = classLabels
self.C = C
self.tol = toler
self.m = shape(dataMatIn)[0]
self.alphas = mat(zeros((self.m, 1)))
self.b = 0
self.eCache = mat(zeros((self.m, 2))) #first column is valid flag
self.K = mat(zeros((self.m, self.m)))
for i in range(self.m):
self.K[:, i] = kernelTrans(self.X, self.X[i, :], kTup)

def calcEk(oS, k):
'''
计算E值并返回
:param oS:
:param k:
:return:
'''
fXk = float(multiply(oS.alphas, oS.labelMat).T*oS.K[:, k] + oS.b)
Ek = fXk - float(oS.labelMat[k])
return Ek

def selectJ(i, oS, Ei):
'''
选择第二个alpha
:param i:
:param oS:
:param Ei:
:return:
'''
maxK = -1
maxDeltaE = 0
Ej = 0
oS.eCache[i] = [1, Ei]
validEcacheList = nonzero(oS.eCache[:, 0].A)[0]
if (len(validEcacheList)) > 1:
for k in validEcacheList:
if k == i:
continue
Ek = calcEk(oS, k)
deltaE = abs(Ei - Ek)
if (deltaE > maxDeltaE):
maxK = k
maxDeltaE = deltaE
Ej = Ek
return maxK, Ej
else:
j = selectJrand(i, oS.m)
Ej = calcEk(oS, j)
return j, Ej

def updateEk(oS, k):
'''
计算误差值并存入缓存
:param oS:
:param k:
:return:
'''
Ek = calcEk(oS, k)
oS.eCache[k] = [1, Ek]

def innerL(i, oS):
'''
优化过程
:param i:
:param oS:
:return:
'''
Ei = calcEk(oS, i)
if ((oS.labelMat[i]*Ei < -oS.tol) and (oS.alphas[i] < oS.C)) or ((oS.labelMat[i]*Ei > oS.tol) and (oS.alphas[i] > 0)):
j, Ej = selectJ(i, oS, Ei)
alphaIold = oS.alphas[i].copy(); alphaJold = oS.alphas[j].copy()
if (oS.labelMat[i] != oS.labelMat[j]):
L = max(0, oS.alphas[j] - oS.alphas[i])
H = min(oS.C, oS.C + oS.alphas[j] - oS.alphas[i])
else:
L = max(0, oS.alphas[j] + oS.alphas[i] - oS.C)
H = min(oS.C, oS.alphas[j] + oS.alphas[i])
if L==H:
print("L==H")
return 0
eta = 2.0 * oS.K[i, j] - oS.K[i, i] - oS.K[j, j]
if eta >= 0:
print("eta>=0")
return 0
oS.alphas[j] -= oS.labelMat[j]*(Ei - Ej)/eta
oS.alphas[j] = clipAlpha(oS.alphas[j], H, L)
updateEk(oS, j)
if (abs(oS.alphas[j] - alphaJold) < 0.00001):
print("j not moving enough")
return 0
oS.alphas[i] += oS.labelMat[j]*oS.labelMat[i]*(alphaJold - oS.alphas[j])
updateEk(oS, i)
b1 = oS.b - Ei - oS.labelMat[i]*(oS.alphas[i]-alphaIold)*oS.K[i, i] - oS.labelMat[j]*(oS.alphas[j]-alphaJold)*oS.K[i, j]
b2 = oS.b - Ej - oS.labelMat[i]*(oS.alphas[i]-alphaIold)*oS.K[i, j] - oS.labelMat[j]*(oS.alphas[j]-alphaJold)*oS.K[j, j]
if (0 < oS.alphas[i]) and (oS.C > oS.alphas[i]):
oS.b = b1
elif (0 < oS.alphas[j]) and (oS.C > oS.alphas[j]):
oS.b = b2
else:
oS.b = (b1 + b2)/2.0
return 1
else:
return 0

def smoP(dataMatIn, classLabels, C, toler, maxIter,kTup=('lin', 0)):
'''
完整的SMO外循环
:param dataMatIn:
:param classLabels:
:param C:
:param toler:
:param maxIter:
:param kTup:
:return:
'''
oS = optStruct(mat(dataMatIn), mat(classLabels).transpose(), C, toler, kTup)
iter = 0
entireSet = True
alphaPairsChanged = 0
while (iter < maxIter) and ((alphaPairsChanged > 0) or (entireSet)):
alphaPairsChanged = 0
if entireSet:
for i in range(oS.m):
alphaPairsChanged += innerL(i,oS)
print("fullSet, iter: %d i:%d, pairs changed %d" % (iter, i, alphaPairsChanged))
iter += 1
else:
nonBoundIs = nonzero((oS.alphas.A > 0) * (oS.alphas.A < C))[0]
for i in nonBoundIs:
alphaPairsChanged += innerL(i, oS)
print("non-bound, iter: %d i:%d, pairs changed %d" % (iter, i, alphaPairsChanged))
iter += 1
if entireSet:
entireSet = False
elif (alphaPairsChanged == 0):
entireSet = True
print("iteration number: %d" % iter)
return oS.b, oS.alphas

def calcWs(alphas, dataArr, classLabels):
'''
利用计算出的alpha进行分类
:param alphas:
:param dataArr:
:param classLabels:
:return:
'''
X = mat(dataArr)
labelMat = mat(classLabels).transpose()
m, n = shape(X)
w = zeros((n, 1))
for i in range(m):
w += multiply(alphas[i]*labelMat[i], X[i, :].T)
return w
1
2
3
4
5
6
7
for i in range(100):
if alphas[i] > 0.0:
print(dateArr[i], labelArr[i])
ws = calcWs(alphas, dateArr, labelArr)
print(ws)
datmat = mat(dateArr)
print(datmat[0]*mat(ws) + b)

最后一行为测试结果,小于0属于-1类,大于0属于1类,等于0属于-1类

0x03 kernel

  • 将数据映射到高维空间

将数据从一个特征空间转换到另一个特征空间
映射会将低维特征空间映射到高维空间

  • 径向基核函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def kernelTrans(X, A, kTup):
'''
核函数
:param X:
:param A:
:param kTup:包含核函数信息的元组
:return:
'''
m, n = shape(X)
K = mat(zeros((m, 1)))
if kTup[0]=='lin':
K = X * A.T
elif kTup[0]=='rbf':
for j in range(m):
deltaRow = X[j, :] - A
K[j] = deltaRow*deltaRow.T
K = exp(K/(-1*kTup[1]**2))
else:
raise NameError('Houston We Have a Problem -- That Kernel is not recognized')
return K

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def testRbf(k1=1.3):
dataArr,labelArr = loadDataSet('testSetRBF.txt')
b,alphas = smoP(dataArr, labelArr, 200, 0.0001, 10000, ('rbf', k1))
datMat=mat(dataArr)
labelMat = mat(labelArr).transpose()
svInd=nonzero(alphas.A>0)[0]
sVs=datMat[svInd]
labelSV = labelMat[svInd]
print("there are %d Support Vectors" % shape(sVs)[0])
m, n = shape(datMat)
errorCount = 0
for i in range(m):
kernelEval = kernelTrans(sVs, datMat[i, :], ('rbf', k1))
predict = kernelEval.T * multiply(labelSV, alphas[svInd]) + b
if sign(predict) != sign(labelArr[i]):
errorCount += 1
print("the training error rate is: %f" % (float(errorCount)/m))
dataArr, labelArr = loadDataSet('testSetRBF2.txt')
errorCount = 0
datMat=mat(dataArr)
labelMat = mat(labelArr).transpose()
m,n = shape(datMat)
for i in range(m):
kernelEval = kernelTrans(sVs, datMat[i, :], ('rbf', k1))
predict = kernelEval.T * multiply(labelSV, alphas[svInd]) + b
if sign(predict) != sign(labelArr[i]):
errorCount += 1
print("the test error rate is: %f" % (float(errorCount)/m))

0x04 实例1

基于SVM的手写数字识别

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def testDigits(kTup=('rbf', 10)):
dataArr,labelArr = loadImages('../Ch02/digits/trainingDigits')
b,alphas = smoP(dataArr, labelArr, 200, 0.0001, 10000, kTup)
datMat=mat(dataArr)
labelMat = mat(labelArr).transpose()
svInd=nonzero(alphas.A > 0)[0]
sVs=datMat[svInd]
labelSV = labelMat[svInd]
print("there are %d Support Vectors" % shape(sVs)[0])
m, n = shape(datMat)
errorCount = 0
for i in range(m):
kernelEval = kernelTrans(sVs, datMat[i, :], kTup)
predict = kernelEval.T * multiply(labelSV, alphas[svInd]) + b
if sign(predict) != sign(labelArr[i]):
errorCount += 1
print("the training error rate is: %f" % (float(errorCount)/m))
dataArr, labelArr = loadImages('testDigits')
errorCount = 0
datMat=mat(dataArr)
labelMat = mat(labelArr).transpose()
m, n = shape(datMat)
for i in range(m):
kernelEval = kernelTrans(sVs, datMat[i, :], kTup)
predict=kernelEval.T * multiply(labelSV, alphas[svInd]) + b
if sign(predict)!=sign(labelArr[i]):
errorCount += 1
print("the test error rate is: %f" % (float(errorCount)/m))

0x05 实例2

XSS Detection

0x01 数据

在github上看到https://github.com/SparkSharly/DL_for_xss 这个项目,感觉不错,学习一下,数据集项目中已经附带,就直接使用了

  • eg. normal_examples.csv (20w+取部分)

  • eg. xssed.csv (4W+取部分)

0x02 分词
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def GeneSeg(payload):
#数字泛化为"0"
payload=payload.lower()
payload=unquote(unquote(payload))
payload,num=re.subn(r'\d+',"0",payload)
#替换url为”http://u
payload,num=re.subn(r'(http|https)://[a-zA-Z0-9\.@&/#!#\?]+', "http://u", payload)
#分词
r = '''
(?x)[\w\.]+?\(
|\)
|"\w+?"
|'\w+?'
|http://\w
|</\w+>
|<\w+>
|<\w+
|\w+=
|>
|[\w\.]+
'''
return nltk.regexp_tokenize(payload, r)

0x03 特征
  • 建立xss语义模型,构建词汇表

统计高频出现的300词构建词表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
words=[]
datas=[]
with open("data/xssed.csv","r",encoding="utf-8") as f:
reader=csv.DictReader(f,fieldnames=["payload"])
for row in reader:
payload=row["payload"]
word=GeneSeg(payload)
datas.append(word)
words+=word

#构建数据集
def build_dataset(datas,words):
count=[["UNK",-1]]
counter=Counter(words)
count.extend(counter.most_common(vocabulary_size-1))
#print(count)
vocabulary=[c[0] for c in count]
#print(vocabulary)
data_set=[]
for data in datas:
d_set=[]
for word in data:
if word in vocabulary:
d_set.append(word)
else:
d_set.append("UNK")
count[0][1]+=1
data_set.append(d_set)
print(data_set)
  • word2vec建模
1
model=Word2Vec(data_set,size=embedding_size,window=skip_window,negative=num_sampled,iter=num_iter)

空间维度设置为32维

查看建模结果,与</script>最语义最相近的词

  • 数据处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def pre_process():
with open(vec_dir,"rb") as f :
word2vec=pickle.load(f)
#词表('UNK': 0, '0': 1)
dictionary=word2vec["dictionary"]
#维度值
embeddings=word2vec["embeddings"]
#反向词表(num和word调换,0: 'UNK', 1: '0')
reverse_dictionary = word2vec["reverse_dictionary"]
xssed_data=[]
normal_data=[]
with open("data/xssed.csv","r",encoding="utf-8") as f:
reader = csv.DictReader(f, fieldnames=["payload"])
for row in reader:
payload=row["payload"]
#分词['search=', '</script>', '<img', 'src=', 'worksinchrome', 'colon', 'prompt', 'x0', '0', 'x0', 'onerror=', 'eval(', 'src', ')', '>']
word=GeneSeg(payload)
xssed_data.append(word)
with open("data/normal_examples.csv","r",encoding="utf-8") as f:
reader = csv.DictReader(f, fieldnames=["payload"])
for row in reader:
payload=row["payload"]
word=GeneSeg(payload)
normal_data.append(word)
xssed_num=len(xssed_data)
normal_num=len(normal_data)
#生成标签[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
xssed_labels=[1]*xssed_num
normal_labels=[0]*normal_num
datas=xssed_data+normal_data
labels=xssed_labels+normal_labels
def to_index(data):
d_index=[]
for word in data:
if word in dictionary.keys():
d_index.append(dictionary[word])
else:
d_index.append(dictionary["UNK"])
return d_index
#数据转换[23, 5, 34, 14, 0, 0, 0, 0, 1, 0, 81, 0, 0, 3, 2]
datas_index=[to_index(data) for data in datas]
#长度不足maxlen的用-1在前端填充
'''
[[ -1 -1 -1 ... 0 3 2]
[ -1 -1 -1 ... 10 17 1]
[ -1 -1 -1 ... 150 0 71]
...
[ -1 -1 -1 ... 11 2 55]
[ -1 -1 -1 ... 5 24 1]
[ -1 -1 -1 ... 1 3 5]]
'''
datas_index=pad_sequences(datas_index,value=-1,maxlen=maxlen)
#从有序列表中选k个作为一个片段返回,eg.[7, 6, 3, 2, 5, 8, 0, 1, 10, 4, 9]
rand=random.sample(range(len(datas_index)),len(datas_index))
#数据简单随机排序
datas=[datas_index[index] for index in rand]
labels=[labels[index] for index in rand]

datas_embed=[]
#获取UNK的维度,本例中是32
dims=len(embeddings["UNK"])
n=0
for data in datas:
data_embed = []
for d in data:
if d != -1:
#如果不是填充数据,就把真实纬度值替换
data_embed.extend(embeddings[reverse_dictionary[d]])
else:
data_embed.extend([0.0] * dims)
datas_embed.append(data_embed)
'''
[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0,
0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0,
0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0,··· -0.5644003, 0.41219762, -1.2313833, -1.3566964,
-0.74316794, -1.2668883, 1.0586963, 1.5969143, 0.21956278, 1.1538218, -0.35007623, 0.21183407,
-0.53830135, 1.7361579, -0.08175806, -1.1915175, -1.7790002, -1.1044971, 0.40857738]
'''
n+=1
if n%10000 ==0:
print(n)
#七成训练,三成测试
train_datas,test_datas,train_labels,test_labels=train_test_split(datas_embed,labels,test_size=0.3)
return train_datas,test_datas,train_labels,test_labels
0x04 SVM训练

通过SVM算法进行模型训练

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
train_datas, train_labels=pre_process()
print("Start Train Job! ")
start = time.time()
model=LinearSVC()
model = SVC(C=1.0, kernel="linear")
model.fit(train_datas,train_labels)
model.save(model_dir)
end = time.time()
print("Over train job in %f s" % (end - start))
print("Start Test Job!")
start=time.time()
pre=model.predict(test_datas)
end=time.time()
print("Over test job in %s s"%(end-start))
precision = precision_score(test_labels, pre)
recall = recall_score(test_labels, pre)
print("Precision score is :", precision)
print("Recall score is :", recall)
with open(model_dir,"wb") as f:
pickle.dump(model,f,protocol=2)
print("wirte to ",model_dir)

精确率和召回率:

  • Post title:Support Vector Machines(SVM)
  • Post author:langu_xyz
  • Create time:2019-07-22 21:00:00
  • Post link:https://blog.langu.xyz/Support Vector Machines(SVM)/
  • Copyright Notice:All articles in this blog are licensed under BY-NC-SA unless stating additionally.