mirror of
https://github.com/Tencent/Metis.git
synced 2025-12-25 19:52:49 +00:00
110 lines
4.9 KiB
Python
110 lines
4.9 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: UTF-8 -*-
|
|
"""
|
|
Tencent is pleased to support the open source community by making Metis available.
|
|
Copyright (C) 2018 THL A29 Limited, a Tencent company. All rights reserved.
|
|
Licensed under the BSD 3-Clause License (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
|
|
https://opensource.org/licenses/BSD-3-Clause
|
|
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
|
|
"""
|
|
|
|
import os
|
|
from time_series_detector.algorithm import isolation_forest, ewma, polynomial_interpolation, statistic, xgboosting
|
|
from time_series_detector.common.tsd_errorcode import *
|
|
from time_series_detector.common.tsd_common import *
|
|
MODEL_PATH = os.path.join(os.path.dirname(__file__), './model/')
|
|
|
|
|
|
class Detect(object):
|
|
|
|
def __init__(self):
|
|
self.iforest_obj = isolation_forest.IForest()
|
|
self.ewma_obj = ewma.Ewma()
|
|
self.polynomial_obj = polynomial_interpolation.PolynomialInterpolation()
|
|
self.statistic_obj = statistic.Statistic()
|
|
self.supervised_obj = xgboosting.XGBoosting()
|
|
|
|
def __list_is_digit(self, data):
|
|
for index in data:
|
|
try:
|
|
float(index)
|
|
except ValueError:
|
|
return False
|
|
return True
|
|
|
|
def __check_param(self, data):
|
|
if ("viewName" not in data.keys()) or ("attrId" not in data.keys()) or ("attrName" not in data.keys()) or ("time" not in data.keys()) or ("dataC" not in data.keys()) or ("dataB" not in data.keys()) or ("dataA" not in data.keys()):
|
|
return TSD_CHECK_PARAM_FAILED, "missing parameter"
|
|
if not data['dataA']:
|
|
return TSD_CHECK_PARAM_FAILED, "dataA can not be empty"
|
|
if not data['dataB']:
|
|
return TSD_CHECK_PARAM_FAILED, "dataB can not be empty"
|
|
if not data['dataC']:
|
|
return TSD_CHECK_PARAM_FAILED, "dataC can not be empty"
|
|
if not self.__list_is_digit(data['dataA'].split(',')):
|
|
return TSD_CHECK_PARAM_FAILED, "dataA contains illegal numbers"
|
|
if not self.__list_is_digit(data['dataB'].split(',')):
|
|
return TSD_CHECK_PARAM_FAILED, "dataB contains illegal numbers"
|
|
if not self.__list_is_digit(data['dataC'].split(',')):
|
|
return TSD_CHECK_PARAM_FAILED, "dataC contains illegal numbers"
|
|
if "window" in data:
|
|
window = data["window"]
|
|
else:
|
|
window = DEFAULT_WINDOW
|
|
if len(data['dataC'].split(',')) != (2 * window + 1):
|
|
return TSD_CHECK_PARAM_FAILED, "dataC length does not match"
|
|
if len(data['dataB'].split(',')) != (2 * window + 1):
|
|
return TSD_CHECK_PARAM_FAILED, "dataB length does not match"
|
|
if len(data['dataA'].split(',')) != (window + 1):
|
|
return TSD_CHECK_PARAM_FAILED, "dataA length does not match"
|
|
return TSD_OP_SUCCESS, ""
|
|
|
|
def value_predict(self, data):
|
|
"""
|
|
Predict the data
|
|
|
|
:param data: the time series to detect of
|
|
"""
|
|
ret_code, ret_data = self.__check_param(data)
|
|
if ret_code != TSD_OP_SUCCESS:
|
|
return ret_code, ret_data
|
|
if "taskId" in data and data["taskId"]:
|
|
model_name = MODEL_PATH + data["taskId"] + "_model"
|
|
else:
|
|
model_name = MODEL_PATH + "xgb_default_model"
|
|
combined_data = data["dataC"] + "," + data["dataB"] + "," + data["dataA"]
|
|
time_series = map(int, combined_data.split(','))
|
|
if "window" in data:
|
|
window = data["window"]
|
|
else:
|
|
window = DEFAULT_WINDOW
|
|
statistic_result = self.statistic_obj.predict(time_series)
|
|
ewma_result = self.ewma_obj.predict(time_series)
|
|
polynomial_result = self.polynomial_obj.predict(time_series, window)
|
|
iforest_result = self.iforest_obj.predict(time_series, window)
|
|
if statistic_result == 0 or ewma_result == 0 or polynomial_result == 0 or iforest_result == 0:
|
|
xgb_result = self.supervised_obj.predict(time_series, window, model_name)
|
|
res_value = xgb_result[0]
|
|
prob = xgb_result[1]
|
|
else:
|
|
res_value = 1
|
|
prob = 1
|
|
ret_data = {"ret": res_value, "p": str(prob)}
|
|
return TSD_OP_SUCCESS, ret_data
|
|
|
|
def rate_predict(self, data):
|
|
"""
|
|
Predict the data
|
|
|
|
:param data: the time series to detect of
|
|
"""
|
|
combined_data = data["dataC"] + "," + data["dataB"] + "," + data["dataA"]
|
|
time_series = map(float, combined_data.split(','))
|
|
statistic_result = self.statistic_obj.predict(time_series)
|
|
if statistic_result == 0:
|
|
prob = 0
|
|
else:
|
|
prob = 1
|
|
ret_data = {"ret": statistic_result, "p": str(prob)}
|
|
return TSD_OP_SUCCESS, ret_data
|