订阅博客
收藏博客
微博分享
QQ空间分享

东风汽车,Apache Spark 完成可扩展日志剖析,发掘体系最大潜力(2),年会节目

频道:娱乐消息 标签:牛生殖器 时间:2019年05月20日 浏览:306次 评论:0条

在本节中,咱们将测验整理和解析日志数据集,以便真正从每个东风汽车,Apache Spark 完结可扩展日志剖析,开掘系统最大潜力(2),年会节目日志音讯中提取包括有意义信息的结构化特点。


日志数据了解

假如你了解 Web 服务器日志,你将认识到上面显现的数据是通用的日志格局。

咱们需求运用一些特定的技能来解析、匹配和提取日志数据中的这些特点。

运用正则表达式进行数据解析和提取

接下来,咱们必须将半结构化的日志数据解析为独自的列。咱们将运用专门的内置函数regexp_天天向上20130816extract()进行解析。此函数将针对具有一个或多个捕获组的正则表达式匹配列,并答应提取其间一个匹配的组。咱们将对期望提取的每个字段运用一个正则表达式。

到目前为止,你必定现已传闻或运用了许多正则表达式。假如你发现正则表达式令人困惑,而且期望了解更多关于正则表达式的信息,咱们主张你拜访RegexOne 网站。你或许还会发现,Goyvaerts 和 Le郭震洲自首vithan 编写的《正则表达式手册》对错常有用的参考资料。

让咱们看下咱们运用的数据会集的日志总数。

print((base_df.count(), len(base_df.columns)))

#Output

(3461613, 1)

看起来咱们总共有大约 346 万条日志音讯。一个不小的数字!让咱们提取并查看一些日志音讯。

sample_logs = [item['value'] for item in base_df.take(15)]

sample_lo东风汽车,Apache Spark 完结可扩展日志剖析,开掘系统最大潜力(2),年会节目gs

提取主机名

让咱们测验编写一些正则表达式来从日志中提取主机名。

host_pattern = r'(^\S+\.[\S+\.]+\S+)\s'

hosts = [re.search(host_pattern, item).group(1)

if re.search(host_pattern, item)

else 'no match'

for item in sample_logs]

hosts

[‘1悬空寺99.72.81.55’,

** ‘unicomp6.unicomp.net’,**

** ‘199.120.110.21’,**

** ‘burger.let石俊男ters.com’,**

…,

…,

** ‘unicomp6.unicomp.net’,**

** ‘d10赖兴发4.aa.net’,**

** ‘d104.aa.net’]**


提取时刻戳

现在让咱们测验运用正则表达式从日志中提取时刻戳字段。

ts_pattern = r'\[(\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} -\d{4})]'

timestamp东风汽车,Apache Spark 完结可扩展日志剖析,开掘系统最大潜力(2),年会节目s = [re.search(ts_pattern, item).group(1) for item in sample_logs]

timest李多喜amps

[‘01/Jul/1995:00:00:01 -0400’,

‘01/Jul/1995:00:00:06 -0400’,

‘01/Jul/1995:00:00:09 -0400’,

…,

…,

‘01/Jul/1995:00:00:14 -0400’,

‘01/Jul/1995:00:00:15 -0400’,

‘01/Jul/1995:00:00:15 -0400’]

提取 HTTP 恳求办法、URI 和协议

现在让咱们测验运用正则表达式从日志中提取 HTTP 恳求办法、URI 和协议形式字段。

method_uri_proto桐华col_pattern = r'\"(\S+)\s(\S+)\s*(\S*)\"'

method_uri_protocol = [re.search(method_uri_protocol_pattern, item).groups()

if re.search(method_uri_protocol_pattern, 东风汽车,Apache Spark 完结可扩展日志剖析,开掘系统最大潜力(2),年会节目item)

else 'no match'

for item in sample_logs]

method_uri_protocol

[(‘GET’, ‘/history/apollo/’, ‘HTTP/1.0’),

** (‘GET’, ‘/shuttle/countdown/’, ‘HTTP/1.0’),**

…,

…,

** (‘GET’, ‘/shuttle/countdown/count.gif’, ‘HTTP/1.0’),**

** (‘GET’, ‘/images/NASA-logosmall.gif’, ‘HTTP/1.0’)]**


提取 HTTP 状况码

现在让咱们测验运用正则表达式从日志中提取 HTTP 状况码。

content_size_pattern = r'\s(\d+)$'

content_size = [re.search(content_size_pattern, item).group(1) for item in sample_logs]

print(content_size)

[‘200’, ‘200’, ‘200’, ‘304’, …, ‘200’, ‘200’]


提取 HTTP 呼应内容巨细

现在让咱们测验运用正则表达式从日志中提取 HTTP 呼应内容巨细。

content_size_pattern = r'\s(\d+)$'

content_size = [re.search(content_size_pattern, item).group(1) for item in sample_logs]

print(content_size)

[‘6245’, ‘3985’, ‘40亟待85’, ‘0’, …, ‘1204’, ‘40310’, ‘786’]


把它们放在一同

现在,让咱们测验运用前面构建的全部正则表达式形式,并运用regexp_extract(…)办法构建 DataFrame,全部日志特点都规整地提取到各自的列中。

from pyspark.sql.functions import regexp_extractd8

logs_df = base_df.select(regexp_extract('value', host_pattern, 1).alias('host'),

regexp_extract('value', ts_pattern, 1).alias('timestamp'),

regexp_extract('value', method_uri_protocol_pattern, 1).alias('method'),

regexp_extract('value', method_uri_protocol_pattern, 2).alias('endpoint'),

regexp_extract('value', method_uri_protocol_pattern, 3).alias('protocol'),

regexp_extract('value', status_pattern, 1).cast('integer').alias('status'),

regexp_extract('value', content_size_pattern, 1).cast('integer').alias('content_size'))

logs_df.show(10, truncate=True)

print((logs_df.count(), len(logs_df.columns)))


查找缺失值

缺失值和空值是数据剖析和机器学习的祸源。让咱们看看咱们的数据解析和提取逻辑是怎样作业的。首要,让咱们验证原始数据框中有没有空行。

(base_df

.filter(base_df['value']

.isNull())

.count())

0

没问题!现在,假如咱们的数据解析和提取作业正常,咱们就不应该有任何或许存在空值的行。让咱们来试试吧!

bad_rows_df = logs_df.filter(logs_df['host'].isNull()|

logs_df['timestamp'].isNull() |

logs_df['method'].isNull() |

logs_df['endpoint'].isNull() |

logs_df['status'].isNull() |

logs_df['content_size'].isNull()|

logs_df['protocol'].isNull())

bad_rows_df.count()

33905

哎哟!看起来咱们的数据中有超越 33K 的缺失值!咱们能搞定吗?

请记住,这不是一个惯例的 pandas DataFrame,你无法直接查询并取得哪些列为空。咱们所谓的大数据集驻留在磁盘上,它或许存在于 Spark 集群中的多个节点上。那么咱们怎样找出哪些列有或许为空呢?


查找 Null 值

咱们一般能够运用以下技能找出哪些列具有空值。(留意:这种办法是从 StackOverflow 上的一个绝妙的答复改造而来的。)

from pyspark.sql.functions import col

from pyspark.sql.functions import sum as spark_sum

def count_null(col_name):

return spark_sum(col(col_name).isNull().cast('integer')).alias(col_name)

# Build up a list of column expressions, one per column.

exprs = [count_null(col_name) for col_name in logs_df.columns]

# Run the aggregation. The *exprs converts the list of expressions into

# variable function arguments.

logs_df.agg(*exprs).show()

看起来status列中有一个缺失值而其它的都在content_size列中。让咱们看看能不能找出问题所在!


处理 HTTP 状况中的空值

状况列解析运用的原始正则表达式是:

regexp_extract('value', r'\s(\d{3})\s', 1).cast('integer')

.alias( 'status')

是否有更多的数字使正则表达式犯错?仍是数据点自身的问题?让咱们试着找出答案。

留意:鄙人面的表达式中,~表明“非”。

null_status_df = base_df.filter(~base_df['value'].rlike(r'\s(\d{3})\s'))

null_status_df.count()

1

让咱们看看这条糟糕的记载是什么姿态?

null_status_df.show(truncate=False)

看起来像一条有许多信息丢掉的记载!让咱们经过日志数据解析管道来传递它。

bad_sta假面骑士amazonstus_df = null_status_df.select(regexp_extract('value', host_pattern, 1).alias('host'),

regexp_extract('value', 宥怎样读ts_pattern, 1).al执政大明ias('timestamp'),

regexp_extract('value', method_uri_protocol_pattern, 1).alias('method'),

regexp_extract('value', method_uri_protocol_pattern, 2).alias('endpoint'),

regexp_extract('value', meth口技od_uri_protocol_pattern, 3).alias('protocol'),

regexp_extract('value', status_pattern,东风汽车,Apache Spark 完结可扩展日志剖析,开掘系统最大潜力(2),年会节目 1).cast('integer').alias('status'),

reg羊水指数exp_extract('value', content_size_pattern, 1).cast('integer').alias('content_size'))

bad_status_df.show(truncate=False)

(图片)

看起来这条记载自身是一个不完整买单吧的记载,没有有用的信息,最好的挑选是删去这条记载,如下所示!

logs_df = logs_df[logs_df['status'].isNotNull()]

exprs = [count_null(col_name) for col_name in logs_df.columns]

logs_df.agg(*exprs).show()

处理 HTTP content size 列中的空值

依据之前的正则表达式,content_size列的原始解析正则表达式为:

regexp_extract('value', r'\s(\d+)$', 1).cast('integer')

.alias('content_size')

原始数据会集是否有数据丢掉?让咱们试着找出答案吧!咱们首要测验找出根本 DataFrame 中或许短少内容巨细的记载。

null_content_size_df = base_df.filter(~base_df['value'].rlike(r'\s\d+$'))

null_content_size_df.count()

33905

这个数值好像与处理后的 DataFrame 中缺失的内容巨细的数量相匹配。让咱们来看看咱们的数据框中短少内容巨细的前十条记载。

null_content_size_df.take(10)

很明显,糟糕的原始数据记载对应过错呼应,其间没有发回任何内容,服务器为content_size字段发出了一个“-”。

由于咱们不想从咱们的剖析中丢掉这些行,所以咱们把它们代入或填充为 0。


修正 content_size 为 null 的行

最简略的解决方案是像前面评论的那样,用 0 替换logs_df中的 null 值。Spark DataFrame API 供给了一组专门为处理 null 值而规划的函数和字段,其间包括:

fillna():用指定的非空值填充空值。

na:它回来一个DataFrameNaFunctions目标,其间包括许多用于在空列上进行操作的函数。

有几种办法能够调用这个函数。最简略的办法便是用已知值替换全部空列。可是,为了安全起见,最好传递一个包括(column_name, value)映射的 Python 字典。这便是咱们要做的。下面是文档中的一个示例:

>>> df4.na.fill({'age': 50, 'name': 'unknown'}).show()

+---+------+-------+

|age|height| name|

+---+------+-------+

| 10| 80| Alice|

| 5| null| Bob|

| 50| null| Tom|

| 50| null|unknown|

+---+------+-------+

现在咱们运用这个函数,用 0 填充content_size字段中全部缺失的值!

logs_df = logs_df.na.fill({'content_size': 0})

exprs = [count_null(伯妮丝col_name) for col_name in logs_df.columns]

logs_df.agg(*exprs).show()

看,没有缺失值了!


处理时刻字段(时刻戳)

现在咱们有了一个洁净的、已解析的 DataFrame,咱们必须将 timestamp 字段解析为一个实践的时刻戳大理昌杨记。通用的日志格局时刻有点不规范。用户界说函数(UDF)是解析它最直接的办法。

from pyspark.sql.functions impor半生缘t udf

month_map = {

'Jan': 1, 'Feb': 2, 'Mar':3, 'Apr日本姓氏':4, 'May':5, 'Jun':6, 'Jul':7,

'Aug':8, 'Sep': 9, 'Oct':10, 'Nov': 11, 'Dec': 12

}

def parse_clf_time(text):

""" Convert Common Log time format into a Python datetime object

Args:

text (str): date and time in Apache time format [dd/mmm/yyyy:hh:mm:ss (+/-)zzzz]

Returns:

a string suitable for passing to CAST('timestamp')

"""

# NOTE: We're ignoring the time zones here, might need to be handled depending on the problem you are solving

return "{0:04d}-{1:02d}-{2:02d} {3:02d}:{4:02d}:{5:02d}".format(

int(text[7:11]),

month_map[text[3:6]],

int(text[0:2]),

int(text[12:14]),

int(text[15:17]),

int东风汽车,Apache Spark 完结可扩展日志剖析,开掘系统最大潜力(2),年会节目(text[18:20])

)

现在,让咱们运用这个函数来解析 DataFrame 中的time列。

udf_parse_time = udf(parse_clf_tim东风汽车,Apache Spark 完结可扩展日志剖析,开掘系统最大潜力(2),年会节目e)

logs_df = (logs_df.select('*', udf_parse_time(logs_df['timestamp'])

.cast('timestamp')

.alias('time'))

.drop('timestamp')

logs_df.show(10, truncate=True)

全部看起来都很好!让咱们经过查看 DataFrame 的形式来验证这一点。

logs_df.printSchema()

root

|-- host: string (nullable = true)

|-- method: string (nullable = true)

|-- endpoint: string (nullable = true)

|-- protocol: string (nullable = true)

|-- status: integer (nullable = true)

|-- content_size: integer (nullable = false)

|-- time: timestamp (nullable = true)

现在,让咱们缓存logs_df,由于咱们将鄙人一部分的数据剖析部分中许多地运用它!

logs_df.cache()

作者:Dipanjan Sarkar; 译者:平川;原文:https://towardsdatascience.com/scalable-log-analytics-with-apache-spark-a-comprehensive-case-study-2be3eb3be977