Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # загрузка библиотек
- import pandas as pd
- import numpy as np
- import pyspark
- from pyspark.sql import SparkSession
- from pyspark.sql.types import *
- import pyspark.sql.functions as F
- from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
- from pyspark.ml.classification import LogisticRegression
- pyspark_version = pyspark.__version__
- if int(pyspark_version[:1]) == 3:
- from pyspark.ml.feature import OneHotEncoder
- elif int(pyspark_version[:1]) == 2:
- from pyspark.ml.feature import OneHotEncodeEstimator
- RANDOM_SEED = 2022
- # инициируем сессию спарк
- spark = SparkSession.builder \
- .master("local") \
- .appName("Housing - Linreg") \
- .getOrCreate()
- # загружаем данные
- df = spark.read.option('header', 'true').csv('/datasets/housing.csv', inferSchema = True)
- # проверяем названия полей
- df.columns
- # считаем количество записей
- df.count()
- # удаляем записи с пропущенными значениями
- subset = ['longitude',
- 'latitude',
- 'housing_median_age',
- 'total_rooms',
- 'total_bedrooms',
- 'population',
- 'households',
- 'median_income',
- 'median_house_value',
- 'ocean_proximity']
- df = df.na.drop(subset=subset)
- # считаем количество записей после удаления
- df.count()
- # размечаем поля с таргетом, входящими категориальными, входящими числовыми
- categorical_cols = ['ocean_proximity']
- numerical_cols = [
- 'longitude',
- 'latitude',
- 'housing_median_age',
- 'total_rooms',
- 'total_bedrooms',
- 'population',
- 'households',
- 'median_income',
- ]
- target = 'median_house_value'
- # трансформация категориальных входящих признаков
- indexer = StringIndexer(inputCols=categorical_cols,
- outputCols=[c+'_idx' for c in categorical_cols])
- df = indexer.fit(df).transform(df)
- cols = [c for c in df.columns for i in categorical_cols if (c.startswith(i))]
- df.select(cols).show(3)
- encoder = OneHotEncoder(inputCols=[c+'_idx' for c in categorical_cols],
- outputCols=[c+'_ohe' for c in categorical_cols])
- df = encoder.fit(df).transform(df)
- cols = [c for c in df.columns for i in categorical_cols if (c.startswith(i))]
- df.select(cols).show(3)
- categorical_assembler = VectorAssembler(inputCols=[c+'_ohe' for c in categorical_cols], outputCol="categorical_features")
- df = categorical_assembler.transform(df)
- # трансформация числовых входящих признаков
- numerical_assembler = VectorAssembler(inputCols=numerical_cols, outputCol="numerical_features")
- df = numerical_assembler.transform(df)
- standardScaler = StandardScaler(inputCol='numerical_features', outputCol="numerical_features_scaled")
- df = standardScaler.fit(df).transform(df)
- df.columns
- # соединяем все признаки в векторном представлении
- all_features = ['categorical_features','numerical_features_scaled']
- final_assembler = VectorAssembler(inputCols=all_features, outputCol="features")
- df = final_assembler.transform(df)
- df.select(all_features).show(3)
- # разделяем на обучающую и тестовую выборки
- train_data, test_data = df.randomSplit([.8,.2], seed=RANDOM_SEED)
- print(train_data.count(), test_data.count())
- # обучаем модель
- lr = LogisticRegression(labelCol=target, featuresCol='features')
- model = lr.fit(train_data)
- # на последнем шаге блокнот уходит на вычисления и уже не возвращается (на блоке отображается звездочка, можно ждать хоть час - изменений нет, хотя это линейная модель.
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement