Table of Contents
- 1. NumPy
- 1.1. frequent operations on shape
- 1.2. theory
- 1.3. shape size dtype etc:
- 1.4. basic
- 1.5. masking and comparision
- 1.6. LOOPING
- 1.7. replace
- 1.8. round округление
- 1.9. keras.utils.to_categorical
- 1.10. save and saves
- 1.11. ignore items on diagonal
- 1.12. get items below diagonal (triangleform from squareform)
- 2. pandas
- 2.1. read csv
- 2.2. sort
- 2.3. replace value
- 2.4. analysis
- 2.5. Series
- 2.6. DataFrame
- 2.7. index and levels
- 2.8. WHERE AND FILTERS
- 2.9. COUNT
- 2.10. RESHAPINGS guide https://pandas.pydata.org/docs/user_guide/reshaping.html
- 2.10.1. Resample for timeseries
- 2.10.2. pivot - rows to columns without aggregation
- 2.10.3. stack (levels)
- 2.10.4. melt - columns to rows
- 2.10.5. pivot_table - allow aggs
- 2.10.6. pivot tables(old)
- 2.10.7. crosstab - frequencies
- 2.10.8. cut - transform continuous variables to discrete or categorical variables
- 2.10.9. dummies
- 2.10.10. factorize - categories to numbers
- 2.10.11. explode
- 2.10.12. assign and explode - split values to rows
- 2.11. Merge, join, and concatenate
- 2.12. DISTICT groupby
- 2.13. two dataframes
- 2.14. Map, Apply, Applymap
- 2.15. save and load
- 2.16. NaN
- 2.17. Categorical encoding
- 2.18. mem usage
- 2.19. rename column
- 2.20. delete column
- 2.21. delete row
- 2.22. type
- 2.23. if a>5 c = True else False
- 2.24. OTHER USE CASES
- 2.25. troubleshooting
- 3. h5py
- 4. DVC
- 5. matplotlib
- 5.1. base
- 5.2. subplot or multiple diagram in one window
- 5.3. x axis labels range
- 5.4. Matplotlib is currently using agg, which is a non-GUI backend, so cannot show the figure.
- 5.5. usage
- 5.6. do not close
- 5.7. Multiple Curves
- 5.8. two windows with separate legend
- 5.9. custom histogram
- 5.10. rotate x ticks
- 5.11. CASES
- 6. SciPy
- 7. Scikit-learn
- 8. TODO statsmodels
- 9. TODO RAPIDS
- 10. TensorFlow (TF)
- 10.1. history
- 10.2. terms
- 10.3. Features:
- 10.4. hello world
- 10.5. deployment
- 10.6. ecosystem
- 10.7. layours
- 10.8. Eager vs Grapth execution
- 10.9. TF 2.0
- 10.10. Save a model
- 10.11. datasets
- 10.12. tf.data.dataset
- 10.13. install
- 10.14. install from source
- 10.15. APIs
- 10.16. tf.placeholder
- 10.17. Logger = Disable
- 10.18. 4D tensor
- 10.19. install
- 10.20. Deploy
- 10.21. tensor
- 10.22. hardware
- 10.23. hello world
- 10.24. main objects
- 10.25. Переменные
- 10.26. TensorBoard
- 10.27. GPU
- 10.28. keras
- 10.29. CNN
- 10.30. RNN and LSTM
- 10.31. plot learning curve
- 10.32. plot CNN layout
- 10.33. Optimizer
- 10.34. models - tensorflow_models as tfm
- 10.35. TensorFlow Serving
- 10.36. TODO TFX pipeline - MLOps
- 10.37. loss
- 10.38. ctc_loss
- 10.39. custom metric
- 10.40. distributed training
- 10.41. toy model MNIST
- 10.42. logging
- 10.43. callbacks for model.fit
- 10.44. USE CASES
- 10.45. common errors:
- 11. PyTorch
- 11.1. install
- 11.2. history
- 11.3. deployment
- 11.4. ecosystem
- 11.5. PyTorch 2.0
- 11.6. device
- 11.7. models - torchvision.models
- 11.8. nn.Module
- 11.9. Dataset and DataLoader, transform
- 11.10. Built-in datasets
- 11.11. train
- 11.12. train (old)
- 11.13. loss, inference, accuracy
- 11.14. numpy
- 11.15. layers
- 11.16. noise
- 11.17. basic nn and gradient
- 11.18. LSTM
- 11.19. Distributed - torch.distributed
- 11.20. retain_graph
- 11.21. memory management
- 11.22. troubleshooting
- 11.23. plot learning curve
- 11.24. links
- 12. TODO PaddlePaddle 飞桨
-- mode: Org; fill-column: 110; coding: utf-8; -- #+TITLE Python for data science
1. NumPy
1.1. frequent operations on shape
1.1.1. reshape((-1, 1)
import numpy as np x = np.array([1,2,3,4,5]) print(np.concatenate((x, x)).reshape((-1, 1)))
[[1] [2] [3] [4] [5] [1] [2] [3] [4] [5]]
1.2. theory
[ˈnʌmpaɪ] large, multi-dimensional arrays and matrices. BSD-new license. multi-dimensional container of generic data
- a powerful N-dimensional array object
- sophisticated (broadcasting) functions
- useful linear algebra, Fourier transform, and random number capabilities
ndarray - n-dimensional array
- homogeneously typed: all elements of a single array must be of the same type
- np.pad(…) routine to extend arrays actually creates new arrays of the desired shape and padding values, copies the given array into the new one and returns it
Type hint
def f(x: np.ndarray) -> np.ndarray
… = : - Ellipse ones[:,5] - пятый слобец
1.3. shape size dtype etc:
- ndarray.shape
- ndarray.size - произведение чисел в shape
- ndarray.dtype - bool_, character, int8, int16, int32, int64, float8, float16, float32, float64, complex64, object_
- ndarray.itemsize - размер элемента в байтах
- ndarray.data - обратно в python - не рекомендуется пользоваться
1.4. basic
import numpy as np a = np.array([1, 2, 3]) a[[1,2]] # array([2, 3]) >>> np.arange(4).reshape((2,2)) array([[0, 1], [2, 3]]) >>> a = np.arange(4).reshape((2,2)) >>> a array([[0, 1], [2, 3]]) >>> a.sum(axis=0) array([2, 4]) >>> a.sum(1) array([1, 5]) >>> a.sum(-1) array([1, 5]) x = np.array([[1,2],[3,4]]) x[:,0] # array([1, 3]) np.zeros((3, 5), dtype=float) # dtype - по умолчанию float np.ones((2, 2, 2)) # all 1 np.eye(5) # единицы на диагонали np.empty((3, 3)) # случайное какая была память так и заполнилась np.arange(10, 30, 5) # range np.linspace(0, 2, 9) # от 0 до 2 - создать 9 штук np.logspace(start, stop, num=50, endpoint=True, base=10.0) # base**start - base ** stop с ускорением np.amax(nparray) # max element np.amin(nparray) # min element np.nanmin(data[:, 1]) # max element at column 1 self.img[:] = 255 # replace every element with single value # filter None elements: self.contours = np.array(list(filter(lambda x:x is not None, self.contours))) # a = np.linspace(-np.pi, np.pi, 100) b = np.sin(a) c = np.cos(a) # Linear algebra from numpy.random import rand from numpy.linalg import solve, inv a = np.array([[1, 2, 3], [3, 4, 6.7], [5, 9.0, 5]]) a.transpose() array([[ 1. , 3. , 5. ], [ 2. , 4. , 9. ], [ 3. , 6.7, 5. ]]) inv(a) array([[-2.27683616, 0.96045198, 0.07909605], [ 1.04519774, -0.56497175, 0.1299435 ], [ 0.39548023, 0.05649718, -0.11299435]]) b = np.array([3, 2, 1]) solve(a, b) # solve the equation ax = b array([-4.83050847, 2.13559322, 1.18644068]) c = rand(3, 3) * 20 # create a 3x3 random matrix of values within [0,1] scaled by 20 array([[ 3.98732789, 2.47702609, 4.71167924], [ 9.24410671, 5.5240412 , 10.6468792 ], [ 10.38136661, 8.44968437, 15.17639591]]) np.dot(a, c) # matrix multiplication a @ c # Starting with Python 3.5 and NumPy 1.10 # per column operations data[:, 1] = (data[:, 1] - data_min) data[:,1] +=1 # Add dimension x = np.expand_dims(x, axis=0) x = x[np.newaxis, :] # elemets at positons a = a[np.array([1, 2, 10, 3])]
1.5. masking and comparision
- x>1 - Boolean array indexing [True, False]
- x[x>1] - select elements with True
- (a[1,:]!=2) & (a[1,:]!=2) - and
- cv2.bitwise_not(gray)
a = array([1, 2, 3, 4, 4]) # get elements where >2 a[np.where( a > 2)] >> array([1, 2, 3, 4, 4]) a[a > 2] >> array([1, 2, 3, 4, 4])
1.6. LOOPING
substarct every [9,3,6] from [1,2,3,4,5,6] and find min of abs:
import numpy as np c = [1,2,3,4,5,6] s = [9,3,6] su = np.repeat([c],len(s),axis=0).T - s m = np.min(np.abs(su), axis=0) print(m)
1.7. replace
my_array[my_array == 8] = 20 my_array[(my_array > 8) | (my_array < 6)] = 20 result= np.where(new_array==np.inf, 0, new_array) # inf result=np.where(np.isinf(a), 999999, a) result=np.where(np.isnan(a), 0, a) np.place(new_values, new_values<0, [0])
1.8. round округление
a = np.array([1.1, 1.5, 1.9], float) >>> np.floor(a) array([ 1., 1., 1.]) >>> np.ceil(a) array([ 2., 2., 2.]) >>> np.rint(a) array([ 1., 2., 2.])
1.9. keras.utils.to_categorical
1.9.1. basic
y_classes = keras.utils.to_categorical(range(len(paths))) # classes array in one-hot train_y.append(y_classes[i]) #to set # back out = model.predict i = np.argmax(out, axis=-1)[0] #id paths[i] # original
1.9.2. add sum category
>>> c array([[1., 0.], [0., 1.]], dtype=float32) np.append(c, [c[0]+c[1]], axis=0) # result: array([[1., 0.], [0., 1.], [1., 1.]], dtype=float32)
1.10. save and saves
np.save('123', data) # 123.npy data = np.load('../123.npy', mmap_mode=None)
1.11. ignore items on diagonal
not_diag = np.where(~np.eye(dists.shape[0],dtype=bool)) cl_distance = np.mean(dists[not_diag]) # mean mey be replace with something close to median
1.12. get items below diagonal (triangleform from squareform)
get upper triangleform:
C3 = np.triu(C2)
ge lower triangleform:
C3 = np.tril(C2)
get elements:
arr2 = np.where(np.tri(arr.shape[0],arr.shape[1], k = -1) == 1)
2. pandas
2.1. read csv
pd.read_csv(p, index_col=0, sep='\t')
- sep='\t' иногда встречается разделение столбцов по \t. обычно запятой
2.2. sort
df.sort_values(by=df['Клиент'], axis=1) # 0 we gave columns, 1 we gave row indexes and sort columns
2.3. replace value
- new column must be created
df.loc[df.Followers == 'N/A', 'Followers'] = np.nan
- can use regex
df['Followers'].replace(to_replace='N/A', value=np.nan)
- can use any function
3.1) on series
df['holiday'] = df['holiday'].apply(lambda x: 1 if x != 0 else 0)
3.2) raw=True gives big speed up
df.apply(lambda row: sum_square(row[0], row[1]), raw=True, axis=1 )
- convert DataFrame to numpy
2.4. analysis
import pandas as pd AH = pd.read_csv('a.csv', header=0, index_col = False) print(df.head()) # first 5 lines print(df.shape) print(df.dtypes.to_string()) # типы всех! столбцов print(df.columns) # названия всех! столбцов print(df.iloc[:]) # названия всех! столбцов print(df['birth_date']) # one column values print(df.isnull().values.any()) # any NaN? print(df.describe(include='all')) # pre column: unique, mean, std, min, квантиль df.iloc[1, :].value_counts() #100 1 400 1 300 1 200 1 df.iloc[1, :].value_counts(normalize=True) #100 0.25 400 0.25 300 0.25 200 0.25 # Categories and Uniques Categorial or not. Unique Values categorial_columns = [c for c in data.columns if data[c].dtype.name == 'object'] categorial_columns = df.select_dtypes(include=["object"]).columns # or numerical_columns = [c for c in data.columns if data[c].dtype.name != 'object'] numerical_columns = df.select_dtypes(exclude=["object"]).columns # or print(data[categorial_columns].describe()) # unique : for c in categorial_columns: : print(c, data[c].unique()) # histogram import matplotlib matplotlib.use('TkAgg') from matplotlib import pyplot as plt AH['SalePrice'].hist(bins = 60, normed=1) # calls matplotlib.pyplot.hist plt.show() # plot столбец sales.iloc[:,1].plot()
2.5. Series
One-dimensional ndarray with axis labels
combine along index
- pd.concat([s1,s2], axis=1)
for dataframes merge:
- df1.reset_index()
- df2.reset_index()
- df1.merge(df2)
mydict = [{'a': 1, 'b': 2, 'c': 3, 'd': 4}, {'a': 100, 'b': 200, 'c': 300, 'd': 400}, {'a': 1000, 'b': 2000, 'c': 3000, 'd': 4000 }] df = pd.DataFrame(mydict) df.iloc[0] # {'a': 1, 'b': 2, 'c': 3, 'd': 4} type(df.iloc[0]) # <class 'pandas.core.series.Series'> df.iloc[[0,1,2]] == df == df.iloc[:3] df.iloc[0, 1] # 2 df.values # convert to numpy
2.6. DataFrame
Two-dimensional, size-mutable data. Container for Series objects
# 1) way d = {'col1': [1, 2], 'col2': [3, 4]} s1 = pd.DataFrame(data=d) # 2) way staff = [(col, melb_df[col].nunique(),melb_df[col].dtypes)] unique_counts = pd.DataFrame( staff, columns=['Column_Name', 'Num_Unique', 'Type'] ).sort_values(by='Num_Unique', ignore_index=True)
2.7. index and levels
- default - created autoincrement int
- df.set_index('c')
- df.reset_index(drop=True, inplace=True) - index to column, create new index, default: drop=False
- df.index = Series - ad hoc index
- df.index.name - index column name
index and columns may have multiple levels
- multilevel index reated by groupby
- df.loc[index, (column|:)] - get values at index
- df.iloc[integer] - get values at position
2.8. WHERE AND FILTERS
https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#boolean-indexing methods
- loc - df.loc[(df['Salary_in_1000']>=100) & (df['Age']< 60) & (df['FT_Team'].str.startswith('S')),['Name','FT_Team']] - df.index[(df['Salary_in_1000']>=100) & (df['Age']< 60)] - numpy where - idx = np.where((df['Salary_in_1000']>=100) & (df['Age']< 60) & (df['FT_Team'].str.startswith('S'))) - df.loc[idx] - Query - df.query('Salary_in_1000 >= 100 & Age < 60 & FT_Team.str.startswith("S").values') - Boolean Indexing - df[(df['Salary_in_1000']>=100) & (df['Age']<60) & df['FT_Team'].str.startswith('S')][['Name','Age','Salary_in_1000']] - eval - df[df.eval("Salary_in_1000>=100 & (Age <60) & FT_Team.str.startswith('S').values")]
bool - | or, & and, ~ not
# DATΑFRAME -------- df.shop_id.nunique() df[df>100] # nan, nan, 101 df[df.shop_id > 20] # filter works! # making boolean series for a team name filter1 = data["Team"]=="Atlanta Hawks" # making boolean series for age filter2 = data["Age"]>24 # filtering data on basis of both filters data.where(filter1 & filter2, inplace = True) # SERIES ------------- s = pd.Series(range(5)) # 0,1,2,3,4 s.where(s>1,-1) # -1, -1, 2, 3, 4 s.mask(s>1, -1) # 0, 1, -1, -1, -1 s[s>2] # 3, 4
2.8.1. filter by date
df = df.dropna(subset=['Дата_заключения_контракта_d']) d0101 = pd.to_datetime('20190101', format='%Y%m%d', errors='ignore') d0731 = pd.to_datetime('20190731', format='%Y%m%d', errors='ignore') df = df[d0101 >= df['Дата_заключения_контракта_d'] >= d0731]
2.9. COUNT
2.9.1. get unique rows with count
a = pd.DataFrame(a.groupby(['Коды отказа', 'Описание кодов отказа']).size().reset_index(name="count")) a = pd.DataFrame(a) c_row = a.pop('count') a.insert(0, 'count', c_row) a.sort_values(by=['count'], ascending=False).to_csv('kod_otkaza.csv')
2.9.2. count example
# Person Age Single # 0 John 24.0 False # 1 Myla NaN True # 2 Lewis 21.0 True # 3 John 33.0 True # 4 Myla 26.0 False # create multiindex and count df.set_index(["Person", "Single"]).count(level="Person") # John 2 # Lewis 1 # Myla 1 df.set_index(["Person", "Single"]).count(level="Single") # False 2 # True 2
2.9.3. most frequent
pd.Series([2,3,4,5,6].value_counts().idxmax()
2.10. RESHAPINGS guide https://pandas.pydata.org/docs/user_guide/reshaping.html
2.10.1. Resample for timeseries
- 'M' - month boundary
- 'A' - annual
loan_rev_data=data['Loan Amount'] loan_rev_data['date'] = pd.DatetimeIndex(data['Created Date']) loan_rev_data = loan_rev_data.set_index('date') monthly_loan_rev_data= loan_rev_data.resample('M').sum()
Loan Amount date 2014-10-31 13039283.00 2014-11-30 16097733.00 2014-12-31 29077334.00
2.10.2. pivot - rows to columns without aggregation
Uses unique values from specified index / columns to form axes of the resulting DataFrame
params: index, columns, values
import pandas as pd df = pd.DataFrame({'foo': ['one', 'one', 'one', 'two', 'two','two'], 'bar': ['A', 'B', 'C', 'A', 'B', 'C'], 'baz': [1, 2, 3, 4, 5, 6], 'zoo': ['x', 'y', 'z', 'q', 'w', 't']}) print(df) print() print(df.pivot(index='foo', columns='bar', values='baz'))
foo bar baz zoo 0 one A 1 x 1 one B 2 y 2 one C 3 z 3 two A 4 q 4 two B 5 w 5 two C 6 t bar A B C foo one 1 2 3 two 4 5 6
Possible misstakes example:
import pandas as pd df = pd.DataFrame({"foo": ['one', 'one', 'two', 'two'], "bar": ['A', 'A2', 'B', 'C'], # new columns should not have duplicates in one index "baz": [1, 2, 3, 4]}) print(df.pivot(index='foo', columns='bar', values='baz'))
bar A A2 B C foo one 1.0 2.0 NaN NaN two NaN NaN 3.0 4.0
2.10.3. stack (levels)
import pandas as pd df_single_level_cols = pd.DataFrame([[0, 1], [2, 3]], index=['cat', 'dog'], columns=['weight', 'height']) print(df_single_level_cols) print() print(df_single_level_cols.stack())
weight height cat 0 1 dog 2 3 cat weight 0 height 1 dog weight 2 height 3 dtype: int64
2.10.4. melt - columns to rows
- ex1
import pandas as pd df = pd.DataFrame( { "first": ["John", "Mary"], "last": ["Doe", "Bo"], "height": [5.5, 6.0], "weight": [130, 150], }) print(df) print() print(df.melt(id_vars=["first", "last"]))
first last height weight 0 John Doe 5.5 130 1 Mary Bo 6.0 150 first last variable value 0 John Doe height 5.5 1 Mary Bo height 6.0 2 John Doe weight 130.0 3 Mary Bo weight 150.0
- ex2
import pandas as pd df = pd.DataFrame({'A': {0: 'a', 1: 'b', 2: 'c'}, 'B': {0: 1, 1: 3, 2: 5}, 'C': {0: 2, 1: 4, 2: 6}}) print(df) print() print(pd.melt(df, id_vars=['A'], value_vars=['B']))
A B C 0 a 1 2 1 b 3 4 2 c 5 6 A variable value 0 a B 1 1 b B 3 2 c B 5
2.10.5. pivot_table - allow aggs
- ex1
import pandas as pd import numpy as np import datetime df = pd.DataFrame( { "A": ["one", "one", "two", "three"] * 6, "B": ["A", "B", "C"] * 8, "C": ["foo", "foo", "foo", "bar", "bar", "bar"] * 4, "D": np.random.randn(24), "E": np.random.randn(24), "F": [datetime.datetime(2013, i, 1) for i in range(1, 13)] + [datetime.datetime(2013, i, 15) for i in range(1, 13)], }) print(df) print() print(pd.pivot_table(df, values="D", index=["A", "B"], columns=["C"])) print() print(pd.pivot_table(df, values="D", index=["B"], columns=["A", "C"], aggfunc=np.sum))
A B C D E F 0 one A foo 0.834789 -0.268575 2013-01-01 1 one B foo -0.332062 -0.324379 2013-02-01 2 two C foo -2.095669 -2.186134 2013-03-01 3 three A bar -0.793498 0.126653 2013-04-01 4 one B bar 0.117796 -0.845898 2013-05-01 5 one C bar 1.016105 -0.369420 2013-06-01 6 two A foo 1.151064 -0.698485 2013-07-01 7 three B foo -0.487159 0.123010 2013-08-01 8 one C foo -1.456931 1.230448 2013-09-01 9 one A bar -0.591074 -0.851506 2013-10-01 10 two B bar 1.332696 0.161591 2013-11-01 11 three C bar 0.033348 -0.187387 2013-12-01 12 one A foo -1.159041 0.321096 2013-01-15 13 one B foo 0.353786 0.724629 2013-02-15 14 two C foo -1.765572 -0.708540 2013-03-15 15 three A bar 0.805330 -0.652539 2013-04-15 16 one B bar -0.124616 0.014006 2013-05-15 17 one C bar -0.052215 -0.168125 2013-06-15 18 two A foo 0.921741 0.280954 2013-07-15 19 three B foo -0.584663 0.727251 2013-08-15 20 one C foo -1.740931 1.516952 2013-09-15 21 one A bar -0.189743 -0.515618 2013-10-15 22 two B bar -0.099166 0.002090 2013-11-15 23 three C bar -0.487092 -0.996470 2013-12-15 C bar foo A B one A -0.390408 -0.162126 B -0.003410 0.010862 C 0.481945 -1.598931 three A 0.005916 NaN B NaN -0.535911 C -0.226872 NaN two A NaN 1.036402 B 0.616765 NaN C NaN -1.930620 A one three two C bar foo bar foo bar foo B A -0.780817 -0.324252 0.011831 NaN NaN 2.072805 B -0.006820 0.021724 NaN -1.071822 1.23353 NaN C 0.963890 -3.197862 -0.453743 NaN NaN -3.861240
- ex2
import pandas as pd import numpy as np print(pd.pivot_table(df[["A", "B", "C", "D", "E"]], index=["A", "B"], columns=["C"])) print() print(pd.pivot_table(df, values="D", index=pd.Grouper(freq="M", key="F"), columns="C")) print() table = pd.pivot_table(df, index=["A", "B"], columns=["C"], values=["D", "E"]) print(table.to_string(na_rep="")) print() table = df.pivot_table( index=["A", "B"], columns="C", values=["D", "E"], margins=True, aggfunc=np.std) print(table) print() print(table.stack())
D E C bar foo bar foo A B one A -0.390408 -0.162126 -0.683562 0.026260 B -0.003410 0.010862 -0.415946 0.200125 C 0.481945 -1.598931 -0.268773 1.373700 three A 0.005916 NaN -0.262943 NaN B NaN -0.535911 NaN 0.425131 C -0.226872 NaN -0.591928 NaN two A NaN 1.036402 NaN -0.208765 B 0.616765 NaN 0.081840 NaN C NaN -1.930620 NaN -1.447337 C bar foo F 2013-01-31 NaN -0.162126 2013-02-28 NaN 0.010862 2013-03-31 NaN -1.930620 2013-04-30 0.005916 NaN 2013-05-31 -0.003410 NaN 2013-06-30 0.481945 NaN 2013-07-31 NaN 1.036402 2013-08-31 NaN -0.535911 2013-09-30 NaN -1.598931 2013-10-31 -0.390408 NaN 2013-11-30 0.616765 NaN 2013-12-31 -0.226872 NaN D E C bar foo bar foo A B one A -0.390408 -0.162126 -0.683562 0.026260 B -0.003410 0.010862 -0.415946 0.200125 C 0.481945 -1.598931 -0.268773 1.373700 three A 0.005916 -0.262943 B -0.535911 0.425131 C -0.226872 -0.591928 two A 1.036402 -0.208765 B 0.616765 0.081840 C -1.930620 -1.447337 D E C bar foo All bar foo All A B one A 0.283784 1.409851 0.840699 0.237509 0.416961 0.494677 B 0.171411 0.484967 0.297085 0.608044 0.741761 0.658146 C 0.755417 0.200819 1.283359 0.142337 0.202589 0.958996 three A 1.130542 NaN 1.130542 0.550971 NaN 0.550971 B NaN 0.068946 0.068946 NaN 0.427263 0.427263 C 0.368006 NaN 0.368006 0.572108 NaN 0.572108 two A NaN 0.162156 0.162156 NaN 0.692568 0.692568 B 1.012479 NaN 1.012479 0.112784 NaN 0.112784 C NaN 0.233414 0.233414 NaN 1.044817 1.044817 All 0.651877 1.140991 0.940582 0.408882 0.998514 0.759845 D E A B C one A All 0.840699 0.494677 bar 0.283784 0.237509 foo 1.409851 0.416961 B All 0.297085 0.658146 bar 0.171411 0.608044 foo 0.484967 0.741761 C All 1.283359 0.958996 bar 0.755417 0.142337 foo 0.200819 0.202589 three A All 1.130542 0.550971 bar 1.130542 0.550971 B All 0.068946 0.427263 foo 0.068946 0.427263 C All 0.368006 0.572108 bar 0.368006 0.572108 two A All 0.162156 0.692568 foo 0.162156 0.692568 B All 1.012479 0.112784 bar 1.012479 0.112784 C All 0.233414 1.044817 foo 0.233414 1.044817 All All 0.940582 0.759845 bar 0.651877 0.408882 foo 1.140991 0.998514
2.10.6. pivot tables(old)
melb_df.groupby(['Rooms', 'Type'])['Price'].mean() # иерархические индексы melb_df.groupby(['Rooms', 'Type'])['Price'].mean().unstack() # раскладывает таблицу в столбцы melb_df.pivot_table( values='Price', index='Rooms', columns='Type', fill_value=0 ).round() # аналогично второму
2.10.7. crosstab - frequencies
frequency table of the factors unless an array of values and an aggregation function are passed.
import pandas as pd import numpy as np foo, bar, dull, shiny, one, two = "foo", "bar", "dull", "shiny", "one", "two" a = np.array([foo, foo, bar, bar, foo, foo], dtype=object) b = np.array([one, one, two, one, two, one], dtype=object) c = np.array([dull, dull, shiny, dull, dull, shiny], dtype=object) print("frequencies:") print(pd.crosstab(a, b)) print() print(pd.crosstab(a, [b, c], rownames=["a"], colnames=["b", "c"]))
frequencies: col_0 one two row_0 bar 1 1 foo 3 1 b one two c dull shiny dull shiny a bar 1 0 0 1 foo 2 1 1 0
2.10.8. cut - transform continuous variables to discrete or categorical variables
import pandas as pd import numpy as np ages = np.array([10, 15, 13, 12, 23, 25, 28, 59, 60]) print(pd.cut(ages, bins=3)) print() print(pd.cut(ages, bins=[0, 18, 35, 70]))
[(9.95, 26.667], (9.95, 26.667], (9.95, 26.667], (9.95, 26.667], (9.95, 26.667], (9.95, 26.667], (26.667, 43.333], (43.333, 60.0], (43.333, 60.0]] Categories (3, interval[float64, right]): [(9.95, 26.667] < (26.667, 43.333] < (43.333, 60.0]] [(0, 18], (0, 18], (0, 18], (0, 18], (18, 35], (18, 35], (18, 35], (35, 70], (35, 70]] Categories (3, interval[int64, right]): [(0, 18] < (18, 35] < (35, 70]]
2.10.9. dummies
- pd.get_dummies(df, prefix="new_prefix")
- pd.from_dummies(df, sep="_")
2.10.10. factorize - categories to numbers
import pandas as pd import numpy as np x = pd.Series(["A", "A", np.nan, "B", 3.14, np.inf]) labels, uniques = pd.factorize(x) print(labels) print(uniques)
[ 0 0 -1 1 2 3] Index(['A', 'B', 3.14, inf], dtype='object')
2.10.11. explode
import pandas as pd import numpy as np keys = ["panda1", "panda2", "panda3"] values = [["eats", "shoots"], ["shoots", "leaves"], ["eats", "leaves"]] df = pd.DataFrame({"keys": keys, "values": values}) print(df) print() print(df["values"].explode()) print() print(df.explode("values"))
keys values 0 panda1 [eats, shoots] 1 panda2 [shoots, leaves] 2 panda3 [eats, leaves] 0 eats 0 shoots 1 shoots 1 leaves 2 eats 2 leaves Name: values, dtype: object keys values 0 panda1 eats 0 panda1 shoots 1 panda2 shoots 1 panda2 leaves 2 panda3 eats 2 panda3 leaves
2.10.12. assign and explode - split values to rows
import pandas as pd import numpy as np df = pd.DataFrame([{"var1": "a,b,c,d", "var2": 1}, {"var1": "d,e,f", "var2": 2}]) print(df) print() print(df.assign(var1=df.var1.str.split(",")).explode("var1"))
var1 var2 0 a,b,c,d 1 1 d,e,f 2 var1 var2 0 a 1 0 b 1 0 c 1 0 d 1 1 d 2 1 e 2 1 f 2
2.11. Merge, join, and concatenate
https://pandas.pydata.org/pandas-docs/stable/user_guide/merging.html
Одну таблицу разделенную на две части:
- верх и низ: pd.concat([s1, s2], ignore_index=True)
- лево и право ?
- concatenate - по умолчанию добавляются строки, default: axis=0, join='outer', ignore_index = False
- pd.concat([df1, df4], axis=1, sort=False) - подбираются столбцы с одинаковым значением, добавляются NaN-s
- join='outer' - NaN-s не добавляются
SQL style
- merge - ignore index, uses specified column
- pd.merge(playdata, genetic_train, on="SK_ID_CURR",how="left" ) - если есть дупликаты справа, то они все войдут даже справа
- "on" must be found in both DataFrames
- indicator=True - adds _merge field with ['left_only', 'right_only', 'both']
- join - uses index column
- first you should set index to joined columns
- table1.join(table2, lsuffix='_table1', rsuffix='_table2',how="left")
new column:
df['asd'] = list
2.11.1. concat series
>>> df 0 0 1 2 3 >>> df2 0 0 1 1 2 >>> pd.concat([df,df2], axis=1) 0 0 0 1.0 1.0 2 3.0 NaN 1 NaN 2.0
import pandas as pd s1 = pd.Series(['a', 'b']) s2 = pd.Series(['c', 'd']) print(pd.concat([s1, s2], ignore_index=True))
0 a 1 b 2 c 3 d dtype: object
2.11.2. concat datafremes vertically
import pandas as pd df1 = pd.DataFrame({'lkey': ['foo', 'bar', 'baz', 'foo'], 'value': [1, 2, 3, 5]}) df2 = pd.DataFrame({'rkey': ['foo', 'bar', 'baz', 'foo'], 'value': [5, 6, 7, 8]}) print(df2) print(pd.concat([df1, df2], ignore_index=True))
rkey value 0 foo 5 1 bar 6 2 baz 7 3 foo 8 lkey value rkey 0 foo 1 NaN 1 bar 2 NaN 2 baz 3 NaN 3 foo 5 NaN 4 NaN 5 foo 5 NaN 6 bar 6 NaN 7 baz 7 NaN 8 foo
2.11.3. merge
import pandas as pd left = pd.DataFrame( { "key": ["K0", "K1", "K2", "K3"], "A": ["A0", "A1", "A2", "A3"], "B": ["B0", "B1", "B2", "B3"], } ) right = pd.DataFrame( { "key": ["K0", "K1", "K2", "K3", "K0"], # K0 duplicate "C": ["C0", "C1", "C2", "C3", "C3"], "D": ["D0", "D1", "D2", "D3", "D3"], } ) result = pd.merge(left, right, on="key", how='left') print(result)
key A B C D 0 K0 A0 B0 C0 D0 1 K0 A0 B0 C3 D3 2 K1 A1 B1 C1 D1 3 K2 A2 B2 C2 D2 4 K3 A3 B3 C3 D3
2.11.4. add by date
def add_holiday_features(df, dfh): df['date'] = df['pickup_datetime'].dt.date df['date'] = df['date'].astype(str) df = df.merge(dfh, 'left', on='date') df['holiday'].fillna(0, inplace=True) df['holiday'] = df['holiday'].apply(lambda x: 1 if x != 0 else 0) df.drop(columns=['date'], inplace=True) return df
2.12. DISTICT groupby
print(df.groupby('shop_id').item_id.value_counts()) print(df.groupby('shop_id').item_id.nunique()) dfg = df[['shop_id', 'item_id'] ].groupby('shop_id') print(dfg.agg(['mean', 'count', 'min']))
2.12.1. row number by group - добавить сложную номерацию по группам
df['Номер_контракта'] = df.groupby(['Клиент'])['Дата_заключения_контракта'].cumcount()+1
2.13. two dataframes
- df1['prices_match'] = np.where(df1['price_1'] == df2['price_2'], 'True', 'False')
- turn values to sets and compare https://numpy.org/doc/stable/reference/routines.set.html
- dfa[dfa['users_id'].isin(dft['users_id'])]
2.13.1. sets comparision
def count_fkey(key1, key2): un1 = np.unique(key1) un2 = np.unique(key2) cm = np.in1d(un1, un2, assume_unique=True) if 'name' in dir(key1): print(f"Unique [{key1.name}]: { un1.size}") print(f"Unique [{key2.name}]: { un2.size}") else: print(f"key1: { un1.size}") print(f"key2: { un2.size}") c = np.unique(cm, return_counts=True) print(pd.DataFrame({'values':c[0], 'count':c[1]}))
2.14. Map, Apply, Applymap
2.14.1. Comparing map, applymap and apply: Context Matters
First major difference: DEFINITION
- map is defined on Series ONLY
- applymap is defined on DataFrames ONLY
- apply is defined on BOTH
Second major difference: INPUT ARGUMENT
- map accepts dicts, Series, or callable
- applymap and apply accept callables only
Third major difference: BEHAVIOR
- map is elementwise for Series
- applymap is elementwise for DataFrames
- apply also works elementwise but is suited to more complex operations and aggregation. The behaviour and return value depends on the function.
Fourth major difference (the most important one): USE CASE
map is meant for mapping values from one domain to another, so is optimised for performance (e.g., df['A'].map({1:'a', 2:'b', 3:'c'})) applymap is good for elementwise transformations across multiple rows/columns (e.g., df[['A', 'B', 'C']].applymap(str.strip)) apply is for applying any function that cannot be vectorised (e.g., df['sentences'].apply(nltk.sent_tokenize))
Footnotes
- map when passed a dictionary/Series will map elements based on the keys in that dictionary/Series. Missing values will be recorded as NaN in the output.
- applymap in more recent versions has been optimised for some operations. You will find applymap slightly faster than apply in some cases. My suggestion is to test them both and use whatever works better. (deprecated)
- map is optimised for elementwise mappings and transformation. Operations that involve dictionaries or Series will enable pandas to use faster code paths for better performance.
- Series.apply returns a scalar for aggregating operations, Series otherwise. Similarly for DataFrame.apply. Note that apply also has fastpaths when called with certain NumPy functions such as mean, sum, etc.
2.14.2. apply to column
df['A'] = df['A'].apply(lambda x: str.strip(x) if pd.notna(x) else x)
2.14.3. return multiple rows
return pd.Series([1,2,3]) ; df['a'].apply(f).to_numpy()[:,1] - time 13 sec
return [1,2,3] ; list(zip(*df['a'].apply(f).to_list()) - time 28.6 sec
2.14.4. example
s.map('I am a {}'.format) s.map({' <=50K.': 0, ' >50K.': 1}) s.map({'fox': 'cub', 'cow': 'calf'}) df['result'] = df['result'].map({b'OK': 1, b'STOP': 0}) df.iloc[:, 0] = df.iloc[:, 0].map({b'OK': 1, b'STOP': 0}) DataFrame.applymap(self, func) # to whole dataFrame DataFrame.apply(self, func, axis=0, raw=False, result_type=None, args=(), **kwds) Series.map(self, arg, na_action=None) # argfunction, collections.abc.Mapping subclass or Series df.iloc[:, 2].map(lambda x: x*x) == df.iloc[:, 2].apply(lambda x: x*x)
2.15. save and load
df.to_pickle('b') df: pandas.DataFrame = pandas.read_pickle('b')
2.15.1. read_csv
# Имена переменных columns = ['age', 'workclass', 'fnlwgt', 'education', 'education-num', 'marital-status', 'occupation', 'relationship', 'race', 'sex', 'capital-gain', 'capital-loss', 'hours-per-week', 'native-country', 'income'] df = pd.read_csv('adult.data', header=None, names=columns, na_values=' ?')
2.15.2. json
pd.read_json('test_data.txt') - {"Клиент":"customer_3567","Дата_заключения_контракта":"2018-05-12","Дата_закрытия_контракта":"2018-06-13","Плановая_дата_закрытия_контракта":"2018-06-13","Сумма_выдачи_по_контракту":21891},{"Клиент":"customer_39200","Дата_заключения_контракта":"2019-03-29","Дата_закрытия_контракта":"2019-04-05","Плановая_дата_закрытия_контракта":"2019-04-05","Сумма_выдачи_по_контракту":11480},{"Клиент":"customer_26509","Дата_заключения_контракта":"2019-03-29","Дата_закрытия_контракта":"2019-04-30","Плановая_дата_закрытия_контракта":"2019-04-28","Сумма_выдачи_по_контракту":2640},{"Клиент":"customer_26623","Дата_заключения_контракта":"2019-03-06","Дата_закрытия_контракта":"2019-03-29","Плановая_дата_закрытия_контракта":"2019-04-06","Сумма_выдачи_по_контракту":25038},{"Клиент":"customer_14647","Дата_заключения_контракта":"2019-03-29","Дата_закрытия_контракта":"2019-04-15","Плановая_дата_закрытия_контракта":"2019-04-15","Сумма_выдачи_по_контракту":6369},{"Клиент":"customer_29658","Дата_заключения_контракта":"2019-12-05","Плановая_дата_закрытия_контракта":"2019-12-27","Сумма_выдачи_по_контракту":24172},{"Клиент":"customer_37798","Дата_заключения_контракта":"2019-11-18","Дата_закрытия_контракта":"2019-12-05","Плановая_дата_закрытия_контракта":"2019-12-18","Сумма_выдачи_по_контракту":9867},
2.16. NaN
выбрать
- df.loc[df.index.isnull()]
2.16.1. check
- df.isnull().values.any() # true or false
- df.isnull().sum() # кол-во по столбцам
- df.hasna - # true or false
2.16.2. replace
- df.dropna(subset=['column_name'], inplace=True)
- df['col'].fillna(0, inplace=True)
2.16.3. drop
df.dropna(subset=['col1', 'col2'],inplace=True) # remove rows if NaN in col1 or col2 column
2.16.4. get not na
df = df[~df['col'].isna()]
2.16.5. other
# MEAN from sklearn.preprocessing import Imputer # Define the values to replce and the strategy of choosing the replacement value imp = Imputer(missing_values="NaN", strategy="mean") cols = [1, 13] df[cols] = imp.fit_transform(applicants[cols]) # REMOVE string -> NaN applicants[cols] = applicants[cols].apply(pd.to_numeric, errors='coerce')
2.17. Categorical encoding
2.17.1. replace values
df['a'] = df['a'].map({b'OK': 1, b'STOP': 0})
replace date:
def repl_date(df_in: DataFrame): df = df_in.copy() # no side effect for i, x in enumerate(df.iloc[0, :]): if isinstance(x, date): # print(i, type(x)) cname = df.columns[i] df[cname] = df[cname].map(lambda x: x.year) return df
2.17.2. label encoding
for c in label_e_columns: df[c] = df[c].astype('category').cat.codes # get velues before encoding print(dict(enumerate(df[c].astype('category').cat.categories)))
2.17.3. encode binary
df['income'] = df['income'].map({' <=50K': 0, ' >50K': 1}) df['income'] = df['income'].notnull().astype(int)
2.17.4. onehot encode
df = pd.get_dummies(df, dummy_na=False) # dummy_na=True for debug s = pd.Series(list('abca')) pd.get_dummies(s) a b c 0 1 0 0 1 0 1 0 2 0 0 1 3 1 0 0
2.18. mem usage
#Great snippet from https://www.kaggle.com/gemartin/load-data-reduce-memory-usage def reduce_mem_usage(df): """ iterate through all the columns of a dataframe and modify the data type to reduce memory usage. """ start_mem = df.memory_usage().sum() / 1024**2 print('Memory usage of dataframe is {:.2f} MB'.format(start_mem)) for col in df.columns: col_type = df[col].dtype if col_type != object: c_min = df[col].min() c_max = df[col].max() if str(col_type)[:3] == 'int': if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max: df[col] = df[col].astype(np.int8) elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max: df[col] = df[col].astype(np.int16) elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max: df[col] = df[col].astype(np.int32) elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max: df[col] = df[col].astype(np.int64) else: if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max: df[col] = df[col].astype(np.float16) elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max: df[col] = df[col].astype(np.float32) else: df[col] = df[col].astype(np.float64) #else: # df[col] = df[col].astype('category') end_mem = df.memory_usage().sum() / 1024**2 print('Memory usage after optimization is: {:.2f} MB'.format(end_mem)) print('Decreased by {:.1f}%'.format(100 * (start_mem - end_mem) / start_mem)) return df
2.19. rename column
df.columns.str.replace("original_column", "APP_SRC_REF")
may rename several columns!
- ('doggod', 'god')
- df.columns.str.replace("god", "war")
- ('dogwar', 'war')
df.rename(columns={"0":"0col", "1": "1col", 2:"2col", 3:"3col"}, inplace=True)
2.20. delete column
- df.drop('education', axis=1, inplace=True)
- df.drop(['education', 'fabrication'], axis=1, inplace=True)
or
- df.drop(columns=['education', 'fabrication'], inplace=True)
- df.drop(df.iloc[:,1:3], axis=1)
- del df['education']
2.21. delete row
2.21.1. delete NA
df.dropna(axis='index', subset=['column1'])
for x in ['sd', 'a2']: ids = df.index[(df["code"] == x) & (df["something"] == 1)] if len(ids) != 0: df.drop(ids, inplace=True)
2.21.2. delete values that is in other df column
import pandas as pd df1 = pd.DataFrame(data = {'col1' : [1, 2, 3, 4, 5, 3], 'col2' : [10, 11, 12, 13, 14, 10]}) df2 = pd.DataFrame(data = {'col1' : [1, 2, 3], 'col2' : [10, 11, 12]}) print(df1) print(df2) df_all = df1.merge(df2.drop_duplicates(), on=['col1','col2'], how='left', indicator=True) print(df_all) print(df_all[df_all['_merge'] == 'left_only'])
col1 col2 0 1 10 1 2 11 2 3 12 3 4 13 4 5 14 5 3 10 col1 col2 0 1 10 1 2 11 2 3 12 col1 col2 _merge 0 1 10 both 1 2 11 both 2 3 12 both 3 4 13 left_only 4 5 14 left_only 5 3 10 left_only col1 col2 _merge 3 4 13 left_only 4 5 14 left_only 5 3 10 left_only
2.22. type
automatic types
error= {‘ignore’, ‘raise’, ‘coerce’}, default ‘raise’
- ignore - invalid parsing will return the input
- coerce - invalid parsing will be set as NaN.
2.22.1. types https://numpy.org/doc/stable/reference/arrays.scalars.html
Pandas dtype | Python type | NumPy type |
---|---|---|
object | str or mixed | string_, unicode_, mixed types |
Int64/Int32 | int | int_, int8, int16, int32, int64, uint8, uint16, uint32, uint64 |
float64 | float | float_, float16, float32, float64 |
bool | bool | bool_ |
boolean | allow NaN | ? |
datetime64 | NA | datetime64[ns] |
timedelta[ns] | NA | NA |
category | NA | NA |
2.22.2. Display types
print(df1.dtypes) categorial_columns = df.select_dtypes(include=["object"]).columns numerical_columns = df.select_dtypes(exclude=["object"]).columns print(data[categorial_columns].describe()) # unique for c in categorial_columns: print(c, data[c].unique())
2.22.3. float to int
with NaN
df['col'] = df['col'].round().astype('Int32')
without NaN
- drop or fill NaN
- df['col'] = df['col'].round().astype(int)
2.22.4. string to date
df['col1'] = pd.to_datetime(df['col1']) df['Дата рождения клиента'] = pd.to_numeric(2021 - pd.to_datetime(df['Дата рождения клиента']).dt.year).astype('Int32')
2.22.5. Category type
object string to category:
- .astype("category")
2.23. if a>5 c = True else False
https://datatofish.com/if-condition-in-pandas-dataframe/
df.loc[df['set_of_numbers'] <= 4, 'flag'] = 'True' df['flag'].fillna(False,inplace=True)
2.24. OTHER USE CASES
2.24.1. dictionary for panda
def list_to_dict(dicts: list) -> dict: """ from [{col1':1, col2':3}, {col1':2, col2':4}] to {'col1': [1, 2], 'col2': [3, 4]} :param dicts: list of dicts :return: dictionary for pandas """ d = {} # target {'col1': [1, 2], 'col2': [3, 4]} for k in dicts[0].keys(): d[k] = [] for x in dicts: for k in dicts[0].keys(): d[k].append(x[k]) return d
2.24.2. Example from dictionary to onehot
def list_to_dict(dicts: list) -> dict: """ from [{col1':1, col2':3}, {col1':2, col2':4}] to {'col1': [1, 2], 'col2': [3, 4]} :param dicts: list of dicts :return: dictionary for pandas """ d = {} # target {'col1': [1, 2], 'col2': [3, 4]} for k in dicts[0].keys(): d[k] = [] for x in dicts: for k in dicts[0].keys(): d[k].append(x[k]) return d def repl_date(df_in: DataFrame): df = df_in.copy() # no side effect for i, x in enumerate(df.iloc[0, :]): if isinstance(x, date): # print(i, type(x)) cname = df.columns[i] df[cname] = df[cname].map(lambda x: x.year) return df def one_hot_p(dicts: list): d = list_to_dict(dicts) df = pd.DataFrame(d) df.iloc[:, 0] = df.iloc[:, 0].map({b'OK': 1, b'STOP': 0}) df = repl_date(df) # print(df.to_string()) df2 = pd.get_dummies(df) return df2
2.24.3. remove meanless columns
df.fillna(0) for x in df.iloc[:]: if df[x].min() == df[x].max(): del df[x]
2.24.4. Sum two columns containing NaN values
total = df['Jan'] + df['Feb'].fillna(0)
2.24.5. reorder columns
# firest target = df.pop('first_decision_state') df.insert(1, 'first_decision_state', target) # second cols = df.columns.tolist() cols = cols[-1:] + cols[:-1] # last to first df = df[cols]
2.24.6. TODO remove duplicates
- df.sort_values(by=['id', 'completed_at'], na_position='first')
- df.drop_duplicates('id', keep='last')
2.24.7. replace missing values by groups
df["value"] = df.groupby("name").transform(lambda x: x.fillna(x.mean()))
df.reset_index(inplace=True, drop=True) shit_cols = ['pickup_day_of_week', 'geo_cluster', 'events'] shits = [] for shit in shit_cols: shits.append(pd.get_dummies(df[shit], prefix=shit, drop_first=True)) print(pd.get_dummies(df[shit], prefix=shit)) shits = pd.concat(shits, axis=1) print(shits.head()) print("Сколько бинарных столбцов у вас получилось сгенерировать с помощью однократного кодирования?\n", len(shits.columns)) # ['pickup_day_of_week_1', 'pickup_day_of_week_2', 'pickup_day_of_week_3', 'pickup_day_of_week_4', 'pickup_day_of_week_5', 'pickup_day_of_week_6', 'geo_cluster_1', 'geo_cluster_2', 'geo_cluster_3', 'geo_cluster_4', 'geo_cluster_5', 'geo_cluster_6', 'geo_cluster_7', 'geo_cluster_8', 'geo_cluster_9', 'events_None', 'events_Rain', 'events_Snow'] df = pd.concat([df.drop(columns=shit_cols), shits], axis=1)
2.24.8. add count of occurences column
df['count'] = df.groupby('Col1')['Col1'].transform('size')
2.25. troubleshooting
df['binary'][0] = 23
SettingWithCopyWarning: rewrite:
df.loc[0, 'binary'] = 23 df.loc[:, c] = pd.Series([2,3,4,])
3. h5py
emerge dev-python/h5py
Groups work like dictionaries, and datasets work like NumPy arrays.
3.1. Dataset object
datasets support operations:
- compression
- error-detection
- chunked I/O
attributes:
- shape
- size
- ndim
- dtype
- nbytes
3.2. terms
- datasets
- array-like collections of data
- groups
- folder-like containers that hold datasets and other groups
3.3. open
- h5py.File() - acts like a Python dictionary
3.4. usage
import h5py f = h5py.File('mytestfile.hdf5', 'r')
3.5. links
4. DVC
fetch data from external, codify data/models and reproducible pipelines.
4.1. features:
- allow to download data from supported sources and keep hash of files.
- versioning through codification - metafiles describing: datasets, ML artifacts, etc. to track.
- allow to create pipiline, fix input and outputs, allow to avoid reruns.
- DVCLive tool for experiment tracking
- allow to create development server with shared and cached data, chached data may be shared between projects.
allow
- Data validation: for example, validation against a schema or verifying pipeline consistency — correct shapes, data types, etc.
- Model validation: for example, input/output and performance validation — all dependencies present for inference to run, and model scores within thresholds.
4.2. problem
to track and storing it in Git
- large datasets
- machine learning models - binary
4.3. terms
- data registry
- git + dvc repository - for versioning of data and model files. The data itself is stored in one or more DVC remotes
- DVC remotes
- similar to Git remotes, used with dvc push and dvc pull commands. To add: dvc remote to .dvc/config.
- stage
- processing step of pipeline. allow connecting code to its corresponding data input/dependencies and output.
- dependencies
- input for a stage. specified as paths in the dev field of ".dvc". Stages are invalidated (considered outdated) when any of their dependencies change.
- output
- result of stage, tracked by DVC.
- parameters
- granular dependencies of stage, such as "batch size", DVC can track any key/value pair in a supported parameters file (params.yaml by default)
- metrics
- feature of "experiments" - allow compare results.
- cache
- hidden storage .dvc/cache
4.4. steps
- dvc init # running inside a Git project
- git commit -m "dvc init"
4.4.1. data:
way 1) git source
- looks like it download file: dvc get https://github.com/iterative/dataset-registry get-started/data.xml -o data/data.xml
- dvc add to start tracking the dataset file. create: data/data.xml.dvc. Same to git add.
- git add data/data.xml.dvc data/.gitignore
- git commit -m "Add raw data"
way 2) local directory
- mkdir /tmp/dvcstore
- dvc remote add -d myremote /tmp/dvcstore
Now we have
- file data/data.xml
- in .gitignore record for this file
- data/data.xml.dvc - hash
dvc checkout to sync data into your workspace
4.4.2. pipelines
abstract:
- virtualenv venv && echo "venv" > .gitignore
- source venv/bin/activate
- pip install -r src/requirements.txt
actual: .4) Create stage:
dvc stage add -n prepare \ -p prepare.seed,prepare.split \ -d src/prepare.py -d data/data.xml \ -o data/prepared \ python src/prepare.py data/data.xml
generate dvc.yaml file, it have:
- command that will be run: python src/prepare.py data/data.xml
- -d - for dependencies
- -o - output
- -p - parameter, such as "batch size"
.5) dvc repro - run the pipeline. dvc.lock (a "state file") was created to capture the reproduction's results, that should be added to git.
- automatically determines which parts of a project need to be run
.6) we can use dvc stage add -d data/prepared - to create chain.
.7) dvc dag - visualize chain of stages .8) dvc params diff - show differences between iterations of pipeline. also there is metrcis diff and ptots diff
4.5. CML - Continuous Machine Learning
orchestration, testing and monitoring.
- manage ML experiments, track who trained ML models or modified data and when.
- Auto-generate reports with metrics and plots
- Build your own ML platform using just GitHub or GitLab and your favorite cloud services: AWS, Azure, GCP, or Kubernetes. No databases, services or complex setup needed.
links
5. matplotlib
5.1. base
ax: Axes = None fig, ax = plt.subplots(1,1, figsize=(19,10)) plt.subplots_adjust(left=0.076, right=0.96, bottom=0.04, top=0.96, wspace=0.30, hspace=0.7) # if more than one plt.plot(.., legend='line1') title="graph" fig.suptitle('test title', fontsize=20) plt.suptitle('test title', fontsize=20) #? plt.title('Title!', {'fontsize':20}) plt.rc('font', size=6) # set font size plt.legend() # add line descriptions fig.subplots_adjust(left=0.4, bottom=0.4) plt.tight_layout() # corret top, left, bottom, right automatic plt.show() # or plt.savefig('name') plt.savefig(title) # horizontal line plt.axhline(y = 2, color = 'r', linestyle = 'dashed', label = "red line") # vertical line plt.axvline(x = 7, color = 'b', label = 'axvline - full height') plt.close() plt.yticks(range(1,10)) # шкала слева as.set_xlim(left=3) # шкалировать от 3
5.2. subplot or multiple diagram in one window
import matplotlib.pyplot as plt fig = plt.figure(figsize=(2,2)) d1: AxesSubplot = fig.add_subplot(1, 2, 1) #1 row 2 columns - left d2: AxesSubplot = fig.add_subplot(2, 2, 2) #2x2 - top right d3: AxesSubplot = fig.add_subplot(2, 2, 4) #2x2 - bottom right plt.show() d: AxesSubplot = fig.add_subplot(121) # equal to 1, 2, 1 fig.tight_layout() # create spaces to allow set_title for graphics # -- define grid more precisely with rations # gs = fig.add_gridspec(nrows=2, ncols=2, # width_ratios=((1,)), # ncols length # height_ratios=(1,1), # nrows # left=0.1, right=0.1, bottom=0.1, top=0.9, # wspace=0.1, hspace=0.1) # ax = fig.add_subplot(gs[1, 0]) # ax.hist(x, bins=bins1)
5.3. x axis labels range
import matplotlib.ticker as plticker loc = plticker.MultipleLocator(base=50) ax.xaxis.set_major_locator(loc)
5.4. Matplotlib is currently using agg, which is a non-GUI backend, so cannot show the figure.
matplotlib.use
5.4.1. TkAgg
import matplotlib matplotlib.use('TkAgg')
Tkinter is a Python binding to the Tk GUI toolkit. It is the standard Python interface to the Tk GUI toolkit, and is Python's de facto standard GUI.
Gentoo: included with standard Linux
Gentoo: USE="tk"
5.4.2. GTK3Agg
Xfce4 - GTK-based
- find out GTK version: dpkg -l libgtk* | grep -e '^i' | grep -e 'libgtk-*[0-9]'
- find out glib version: ldd –version
- apt install libglib2.0-dev
- apt install libgirepository1.0-dev
- apt install libcairo2-dev
- apt install python3-dev
- pip install pycairo
- apt-get install libgtk-3-dev
- pip3 install PyGObject –user
import matplotlib matplotlib.use('GTK3Agg')
5.5. usage
from matplotlib import pyplot as plt # time sequence plt.plot(range(len(a)), a) plt.show() # time sequence - голубыми Точками plt.plot(range(len(a)), a, 'bo') plt.show() # Histogram - distribution of numerical data # бакет - дискретный интервал разбиения N = 100 noise = np.random.normal(loc=0.0, scale=1.0, size=(N, 1)) plt.hist(noise, bins='auto', density=True) plt.show() # Scatter - y=f(x) в виде точек, где x не по порядку. plt.scatter(x_np, y_rows) plt.show() # В виде линии res = sorted(zip(x_np,y_rows) , key=lambda k: k[0]) # сортируем по x x, y = zip(*res) # unzip plt.plot(x, y) plt.show() #matr_my - shape=(50,512) - value=[0;1] в виде спектра. plt.pcolormesh(matr_my, cmap='RdBu') plt.xlabel('Depth') plt.xlim((0, 512)) plt.ylabel('Position') plt.colorbar() plt.show()
5.6. do not close
plt.close() plt.plot() plt.draw() plt.pause(0.0001)
5.7. Multiple Curves
import matplotlib.pyplot as plt x = [0,1,2,3,4] y1 = [2,3,5,7,8] y2 = [2, 3, 7, 7, 8] plt.plot(x, y1, label = "1") plt.plot(x, y2, label = "2") plt.show()
5.8. two windows with separate legend
x = [0, 1, 2, 3, 4] y1 = [2, 3, 5, 7, 5] y2 = [2, 3, 7, 7, 8] import matplotlib.pyplot as plt plt.figure() ax = plt.gca() plt.plot(x, y1, label="1") plt.plot(x, y2, label="2") plt.figure() plt.plot(x) plt.figlegend(*ax.get_legend_handles_labels(), loc='upper left') plt.show()
5.9. custom histogram
# get hist counts, edges = np.histogram(A, bins=10, range=(0,10)) bincenters = 0.5 * (edges[1:] + edges[:-1]) spline = make_interp_spline(bincenters, counts, k=k) # that is how to loop edges for pair in zip(binEdges[:-1], binEdges[1:]): low, high = pair # back to data A = np.repeat(edges[:-1], counts)
5.10. rotate x ticks
plt.xticks(rotation=10)
5.11. CASES
5.11.1. TODO bar plot with two y axes
5.11.2. varible in time
plt.plot_date(df['date'],df['x]) plt.show
6. SciPy
adds more MATLAB-like functionality and Matplotlib is a plotting package that provides MATLAB-like plotting functionality
6.1. hierarchical lustering
6.1.1. distance and squareform
pdist - Pairwise distances between observations
>> array([0., 2., 2.])
squarefor - returns a symmetric matrix where Z(i,j) corresponds to the pairwise distance between observations i and j
dist:
from scipy.spatial.distance import squareform from scipy.spatial.distance import pdist d = pdist([[1,2],[1,2], [3,2]]) print(d) print() sq = squareform(d) print(sq)
here: [0. 0. 2.] (1) - distances between first observation and first, second, third observation
6.1.2. linkage
- hierarchical/agglomerative https://docs.scipy.org/doc/scipy/reference/generated/scipy.cluster.hierarchy.linkage.html#scipy.cluster.hierarchy.linkage
- very similar to the MATLAB linkage function https://www.mathworks.com/help/stats/linkage.html
- better to print with:
[print(i+len(df), x) for i, x in enumerate(l)]
At the i-th iteration, clusters with indices Z[i, 0] and Z[i, 1] are combined to form cluster n + i.
- i-th row - iteration
- 0 and 1 - cluster numbers or observation number if x<n
- 2 - is a distance between 0 and 1
- Z[i, 3] represents the number of original observations in the newly formed cluster
format:
6.1.3. dendrogram
to see count of observatins in clusters - set truncate_mode='level' and p=1.1 to level.
from matplotlib import pyplot as plt dendrogram(Z=l, p=1.1, truncate_mode='level', labels=df.index, count_sort=False, distance_sort=False, orientation='right', leaf_font_size=15) plt.show()
6.1.4. cophentic correlation
pearson correlation
7. Scikit-learn
- based on numpy and SciPy
- scikit-learn can be classified as a tool in the "Machine Learning Tools" category, while SciPy is grouped under "Data Science Tools".
7.1. history
- 2007 begin
- 2010 first release
7.2. fast feature selection
- https://scikit-learn.org/stable/modules/feature_selection.html#univariate-feature-selection
- For regression: f_regression, mutual_info_regression
- For classification: chi2, f_classif, mutual_info_classif
- sparse data: chi2, mutual_info_regression, mutual_info_classif will deal with the data without making it dense.
from sklearn.feature_selection import SelectKBest from sklearn.feature_selection import f_regression # or chi selector = SelectKBest(f_regression, k=25) X_new = selector.fit_transform(X, y) names = X.columns.values[selector.get_support()] scores = selector.scores_[selector.get_support()] names_scores = list(zip(names, scores)) print("Укажите признаки, которые вошли в список отобранных:") [print(x) for x in names_scores]
7.3. sklearn.tree.DecisionTreeClassifier
- the algorithm chooses a feature and makes a split
- looks at the subsets and measures their impurity using the (gini,entropy) score (impurity)
- for multiple thresholds and determines that the best split for the given feature
- repeat for all features and nodes
- from root to leaves
7.3.1. usage
test = 0 # matrix.shape[0] // 3 train = int(matrix.shape[0] - test) data_train = matrix[:train, 1:].copy() # 11 column - labels labels_train = matrix[:train, 0].copy() # 11 column - labels # print(labels_train) data_test = matrix[train:, 1:].copy() # 11 column - labels labels_test = matrix[train:, 0].copy() # 11 column - labels print(data_train.shape) print(data_test.shape) print(labels_train.shape) models = [] # DecisionTreeClassifier ------------------------------ from sklearn.tree import DecisionTreeClassifier data_train[np.isnan(data_train)] = -1 # replace nan data_train_orig = data_train.copy() model = DecisionTreeClassifier(random_state=42, # функция для impurity ('gini' или 'entropy') criterion='gini', # максимальная глубина дерева max_depth=3, # минимальное число элементов в узле для разбиения (может быть долей) min_samples_split=5, # минимальное число элементов в листе (может быть долей) min_samples_leaf=2, # минимальное значение дельты impurity # min_impurity_decrease=0, # веса для классов (можно дополнительно штрафовать за ошибку в нужных классах). # поддерживает опцию 'balanced'. class_weight=None, # предварительная сортировка. # ускоряет обучение на данных небольшого размера или с ограниченной глубиной дерева. # иначе замедляет обучение. presort=False ) # Обучаем модель data_train[np.isnan(data_train)] = -1 model.fit(data_train, labels_train) # delete feature parent_feature = model.feature_importances_.argmax() # 0... print(parent_feature) data_train[:, parent_feature] = np.zeros(data_train.shape[0]) # (0... from IPython.display import Image from sklearn.tree import export_graphviz from subprocess import call export_graphviz(model, out_file='tree.dot', # задать названия фич # feature_names=X.columns, class_names=None, # показывать названия полей у численных значений внутри узла label='all', # раскрашивать узлы в цвет преобладающего класса filled=True, # показывать значение impurity для каждого узла impurity=True, # показывать номера узлов node_ids=True, # Показывать доли каждого класса в узлах (а не количество) proportion=True, # Повернуть дерево на 90 градусов (вертикальная ориентация) rotate=False, # Число точек после запятой для отображаемых дробей # precision=3 ) # Преобразуем файл tree.dot в tree.png call(['dot', '-Tpng', 'tree.dot', '-o', 'tree.png']) # Вставляем картинку в блокнот # Image("tree.png") # data_test[np.isnan(data_test)] = -1 test_result = model.predict(data_train_orig) # RESULT auc = sklearn.metrics.roc_auc_score(labels_test, test_result) gini = 2 * auc - 1
7.4. Tuning the hyper-parameters https://scikit-learn.org/stable/modules/grid_search.html
- GridSearchCV - Exhaustive Grid Search, all parameter combinations
- HalvingGridSearchCV - evaluating all the candidates with a small amount of resources and iteratively selects the best candidates, using more and more resources.
- RandomizedSearchCV - given number of candidates
- HalvingRandomSearchCV -
SH is an iterative selection process where all candidates (the parameter combinations) are evaluated with a small amount of resources at the first iteration. the resource is
- the number of training samples
- arbitrary numeric parameter such as n_estimators in a random forest.
parameters
- factor (> 1) - each iteration, the number of resources per candidate is multiplied, candidates is divided
(3 usually works well)
- HalvingRandomSearchCV: aggressive_elimination=True can also be used if the number of available resources is small.
RandomizedSearchCV vs GridSearchCV https://analyticsindiamag.com/why-is-random-search-better-than-grid-search-for-machine-learning/
7.5. feature importance
from sklearn.ensemble import GradientBoostingRegressor dt = GradientBoostingRegressor() indices = np.argsort(dt.feature_importances_)[::-1] # sort indexes print(indices) for i in range(len(X_column_names)): # первые 100 print("%d. %s (%f)" % (i + 1, X_column_names[indices[i]], dt.feature_importances_[indices[i]] / 100))
7.6. Encoders - sklearn.preprocessing.*
- OrdinalEncoder
- OneHotEncoder -
- min_frequency=0.5 - all values that have < min_frequency will be as 'others' column
- TargetEncoder - target mean with the target mean conditioned on the value of the category, good for features with high cordinality and hight correlation with target. Shuffle by default, use internal cross-fitting.
7.7. suppress warnings
import warnings warnings.filterwarnings("ignore", category=Warning) from sklearn.metrics import precision_score y_true = [0, 1, 2, 0, 1, 2] y_pred = [0, 2, 0, 0, 0, 0] print(precision_score(y_true, y_pred, average='macro'))
0.13333333333333333
8. TODO statsmodels
used in econometrics, generalised-linear models, time-series-analysis, statistical hypothesis testing, and regression models for "rigorous statistics", for explanatory analysis
9. TODO RAPIDS
GPU accelerated data science
10. TensorFlow (TF)
- лекция https://www.youtube.com/watch?v=sTkUjqsjs00
- tutorial https://www.tensorflow.org/tutorials/
- guide https://www.tensorflow.org/guide/
- lections pdf http://web.stanford.edu/class/cs20si/lectures/
Apache 2.0
- разработанная компанией Google
- used for machine learning applications such as neural networks
- Создается вычислительный граф. - Графовый фреймворк
‐ Cleverhans - фреймворд чтобы атаковать и защищать модели??
- Lucid - визуализировать
- define computation graph - позволяет автоматическое дифференцирование
- Nodes - operators, varibles, constants
- Edges - tensors
10.1. history
2.4.0
- MultiWorkerMirroredStrategy - no longer experimental
- TensorFlow Profiler now supports profiling `MultiWorkerMirroredStrategy`
10.2. terms
- batch
- weights and biases are only updated after all of the inputs and targets are presented
- epoch
- is one single pass over the entire training set
- train_step
- function that is called by fit() for every batch of data. Execute Forward pass with tf.GradientTape(). Return a dict mapping metric names to current value.
- Operations (Ops)
- high level operation on Tensor.
- Kernel
- implementation of an op tied to specific hardware/platform. Some ops have a one-to-one mapping from op to kernel while other ops use multiple kernels.
- Gradient / GradFunc
- The ‘backward mode’ definition of an op/kernel that computes the derivative of that function with regards to some input.
10.3. Features:
- Stable
- Well-documented sources
- Flexibility
- Portability
- Scalability
- Popularity
Cons:
- Невозможно обучать распределенно
- Метрический тензор нельзя запрограммировать
10.4. hello world
import tensorflow as tf import timeit # -- set device manually try: gpus = tf.config.experimental.list_physical_devices('GPU') tf.config.set_visible_devices(gpus[0], 'GPU') logical_gpus = tf.config.list_logical_devices('GPU') except RuntimeError as e: print(e) # -- eager execution # Note: steps through all of the program operations, needed or not. a = tf.Variable([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]], trainable=False) b = tf.Variable([[1.0, 2.0, 3.0]], trainable=False) k = a * b print(k) # -- graph execution # Note: graph execution enables portability outside Python and tends to offer better performance # consist of: tf.Operation objects, which represent units of computation; and tf.Tensor objects, which represent the units of data that flow between operations # using graph directly is depricated # Graph execution only executes the operations necessary to produce the observable effects, which includes: "Non-strict execution" x = tf.random.uniform(shape=[10, 10], minval=-1, maxval=2, dtype=tf.dtypes.int32) def power(x, y): result = tf.eye(10, dtype=tf.dtypes.int32) for _ in range(y): result = tf.matmul(x, result) return result print("Eager execution:", timeit.timeit(lambda: power(x, 100), number=1000), "seconds") power_as_graph = tf.function(power) print("Graph execution:", timeit.timeit(lambda: power_as_graph(x, 100), number=1000), "seconds")
10.5. deployment
- TensorFlow Serving - models on servers, be them in-house or on the cloud, and is used within the TensorFlow
Extended (TFX) end-to-end Machine Learning platform.
- deploy with static API.
- tightly integrated with Google Cloud via Vertex AI and integrates with Kubernetes and Docker.
- Android and iOS, as well as microcontrollers (ARM with Bazel or CMake) and embedded Linux (e.g. a Coral device)
- TensorFlow Lite - on mobile or IoT/embedded devices
TFLite addresses 5 constraints for on-device Artificial Intelligence:
- latency, connectivity, privacy, size, and power consumption
10.6. ecosystem
- TensorFlow Hub https://www.tensorflow.org/hub
- Model Garden - source code for Hub models - Models and examples built with TensorFlow https://github.com/tensorflow/models
- the source code for SOTA models available
- Extended (TFX) TensorFlow's end-to-end platform for model deployment. https://www.tensorflow.org/tfx
- can use Apache Airflow/Beam or Kubernetes for orchestration
- tightly integrated with Google Cloud and can be used with Vertex AI Pipelines.
- Vertex AI - Google Cloud’s unified Machine Learning platform
- seeks to unify services into one platform
- MediaPipe framework for building multimodal, cross-platform applied Machine Learning pipelines https://mediapipe.dev/ https://google.github.io/mediapipe/
- Coral - local AI - offers an array of hardware products
- powerful Raspberry Pis with Edge TPUs
- TensorFlow.js - JavaScript library - to train and deploy models both in the browser and server-side with Node.js
- Cloud - allows you to connect your local environment to Google Cloud - https://www.tensorflow.org/cloud
- Colab
- Datasets https://research.google/tools/datasets/?ref=assemblyai.com
10.7. layours
- tf.Module - is the base class for both tf.keras.layers.Layer and tf.keras.Model
- tf.keras.layers.Layer
- tf.keras.Model
10.8. Eager vs Grapth execution
Eager
- evaluate operations immediately
- do not build graphs
- operations return actual values instead of graphs to run later
Graph @tf.function, tf.Graph
- to accelerate your models.
- Graph - set of tf.Operation objects, which represent units of computation; and tf.Tensor objects, which represent the units of data that flow between operations.
- can be saved, run, and restored all without the original Python code.
- By default, Model.fit() we will attempt to compile your model to a static graph
10.9. TF 2.0
- https://www.tensorflow.org/guide/effective_tf2?hl=ru
- https://medium.com/tensorflow/effective-tensorflow-2-0-best-practices-and-whats-changed-a0ca48767aff
- Chinese https://tf.wiki/en/basic/tools.html#graph-execution-mode-tf-function
API:
- tf.keras - High level API
- Eager Execution By Default with "Gradient Tape". For optimization require @tf.function
https://www.tensorflow.org/guide/eager
- keras API Model subclassing https://www.tensorflow.org/guide/keras/custom_layers_and_models
- tf.data is going to replace tf.placeholders
- No more tf.Session()
10.9.1. tf.GradientTape API
for automatic differentiation using "reverse mode differentiation"
- resources held by a GradientTape are released as soon as GradientTape.gradient() method is called
- Trainable variables (created by tf.Variable or tf.compat.v1.get_variable, where trainable=True is default in both cases) are automatically watched.
- at least one of inputs is being "watched".
with tf.GradientTape() as g: g.watch(x) y = x * x dy_dx = g.gradient(y, x)
10.9.2. tf.function
TensorFlow graphs require static dtypes and shape dimensions. tf.function keeps a cache of concrete functions generated by tracing.
trace_cache_key as function of datatype and shape of every Tensor argument and tf.device() scope. For a Python primitive is its value. Key is used to determine if a new graph needs to be created or if a previously created graph can be invoked.
Nones:
- Can only use Tensors arguments.
- runs all stateful operations (e.g. tf.print)
Argumets must be either:
- Tensor (ndarrays are converted to the equivalent Tensor)
- list of Tensor
- arbitrary Python value
The main takeaways and recommendations are:
- Don't rely on Python side effects like object mutation or list appends.
- tf.function works best with TensorFlow ops, rather than NumPy ops or Python primitives.
- When in doubt, use the for x in y idiom.
- wrap function
https://www.tensorflow.org/api_docs/python/tf/compat/v1/wrap_function
tf.compat.v1.wrap_function
- do not runs all stateful operations (e.g. tf.cond)
- only trace the Python function once
from tensorflow_core.python.eager.wrap_function import WrappedFunction, VariableHolder, wrap_function wf:WrappedFunction = wrap_function(f)
class WrappedFunction(function.ConcreteFunction): """Callable object encapsulating a function definition and its gradient.
- AutoGraph включен в tf.function
для преобразования if и for в tf.cond и tf.while.
10.9.3. migrate 1 to 2
- https://www.tensorflow.org/guide/migrate?hl=ru
- 2017 stratch https://ai.googleblog.com/2017/10/eager-execution-imperative-define-by.html
- https://colab.research.google.com/github/tensorflow/docs/blob/master/site/en/guide/migrate.ipynb
import tensorflow.compat.v1 as tf tf.disable_v2_behavior()
- Eager execution, v1.enable_eager_execution() - tf.Graph will fail - wrap this code in a with tf.Graph().as_default() context.
- Resource variables, v1.enable_resource_variables() - 2.0 Resource variables are locked while being written to
- Tensor shapes, v1.enable_v2_tensorshape() - t.shape[0].value will fail
- Control flow, v1.enable_control_flow_v2()
10.9.4. custome layer
- https://www.tensorflow.org/tutorials/customization/custom_layers
- https://www.tensorflow.org/guide/keras/custom_layers_and_models
- Convolution https://github.com/basveeling/keras-gcnn/blob/master/keras_gcnn/layers/convolutional.py
Custom layers
Methods:
- __init__()
- build()`: Called once from `__call__`, when we know the shapes of inputs and `dtype`.
- call()
Arguments __init__():
- trainable
- Boolean, whether the layer's variables should be trainable.
- name
- String name of the layer.
- dtype
- The dtype of the layer's computations and weights (default of `None` means use `tf.keras.backend.floatx` in TensorFlow 2, or the type of the first input in TensorFlow 1).
- dynamic
- Set this to `True` if your layer should only be run eagerly, and should not be used to generate a static computation graph. This would be the case for a Tree-RNN or a recursive network, for example, or generally for any layer that manipulates tensors using Python control flow. If `False`, we assume that the layer can safely be used to generate a static computation graph.
class Linear(layers.Layer): def __init__(self, units=32): super(Linear, self).__init__() self.units = units def build(self, input_shape): self.w = self.add_weight(shape=(input_shape[-1], self.units), initializer='random_normal', trainable=True) # все self переменные попадают в model.variables автоматически self.b = self.add_weight(shape=(self.units,), initializer='random_normal', trainable=True) def call(self, inputs): return tf.matmul(inputs, self.w) + self.b
10.9.5. decayed learning rate
optimizer = SGD(learning_rate=0.006, decay=0.003, momentum=0.3)
lr = optimizer._decayed_lr(tf.float32) print("lr: %f" % lr)
10.9.6. layer-wise learning rate in Tensorflow?
10.10. Save a model
- v1 https://cv-tricks.com/tensorflow-tutorial/save-restore-tensorflow-models-quick-complete-tutorial/
API
- tf.compat.v1.train.Saver - binary format. Not-object-based
- my_test_model-1000.index
- my_test_model-1000.meta
- my_test_model-1000.data-00000-of-00001
- checkpoint - keeps a record of latest checkpoint files saved
- tf.keras.Model
- tf.compat.v2.train.Checkpoint - binary object-based checkpoints
10.10.1. v1 Saver loading:
- https://www.tensorflow.org/api_docs/python/tf/compat/v1/train/import_meta_graph
- example https://github.com/ZZUTK/TensorFlow_VGG_train_test/blob/master/testing.py
steps
- with tf.compat.v1.Session() as sess: or tf.compat.v1.Session()
- saver = tf.compat.v1.train.import_meta_graph('my_test_model-1000.meta') # this will create the graph/network for you but we still need to load the value of the parameters that we had trained on this graph
- saver.restore(sess,tf.train.latest_checkpoint('./')) # restore the parameters of the network
- print(sess.run('w1:0')) - print saved value of w1.
Run:
graph = tf.compat.v1.get_default_graph() w1 = graph.get_tensor_by_name("w1:0") w2 = graph.get_tensor_by_name("w2:0") feed_dict ={w1:13.0,w2:17.0} op_to_restore = graph.get_tensor_by_name("op_to_restore:0") print sess.run(op_to_restore,feed_dict)
10.10.2. v2 saving loading
- Checkpoints - exact value of all parameters (tf.Variable) - source code required
- tf.keras.Model.save_weights(path/mymodel)
- Model.save(path) - the parameter values && serialized description of the computation defined by the model. Source code not needed.
10.11. datasets
- tf.keras.datasets: https://www.tensorflow.org/api_docs/python/tf/keras/datasets
- boston_housing module
- cifar10 module
- cifar100 module
- fashion_mnist module
- imdb module
- mnist module
- reuters module
- tensorflow_datasets
tfds.load is a thin wrapper around tfds.core.DatasetBuilder
10.11.1. install and use tfds
pip install tensorflow-datasets
import tensorflow_datasets as tfds tfds.display_progress_bar(True) # 1) easy way ds = tfds.load('mnist', split='train', shuffle_files=True) assert isinstance(ds, tf.data.Dataset)
10.11.2. download
# create directory required from pathlib import Path Path("/mnt/ssd/datasets/tensorflow_datasets/downloads/manual").mkdir(parents=True, exist_ok=True) # test # tfds.load('mnist', data_dir="/mnt/ssd/datasets/tensorflow_datasets") import tensorflow_datasets as tfds tfds.display_progress_bar(True) # do not download 'robotics:mt_opt_rlds' and 'huggingface:wmt19' l = [x for x in sorted(tfds.list_builders()) if ":" not in x ] errors=[] for x in l: try: ds = tfds.load(x, data_dir="/mnt/ssd/datasets/tensorflow_datasets") except Exception as e: errors.append(x) print("datasets with errors:", errors)
10.11.3. landmark 2020
Number of unique landmark_id: 81313
import os import pandas as pd import tensorflow as tf import numpy as np # ------- data def get_paths(path="/landmark-retrieval-2020/train", max_count=-1): index = ["0","1","2","3","4","5","6","7","8","9","a","b","c","d","e","f"] paths = [] for a in index: for b in index: for c in index: paths.extend([path+f"/{a}/{b}/{c}/" + x for x in os.listdir(path+f"/{a}/{b}/{c}")]) if max_count > 0 and len(paths) > max_count: break return paths paths = get_paths("/landmark-retrieval-2020/train", 100) df = pd.read_csv("/landmark-retrieval-2020/train.csv") # count 1580470 # id landmark_id mapping = {} for path in paths: mapping[path.split('/')[-1].split('.')[0]] = path df['path'] = df['id'].map(mapping) # add path column df = df[~ df.path.isna()] # select records with "path" column # - add probability for ... alpha=0.6 counts_map = dict(df.groupby('landmark_id')['path'].agg(lambda x: len(x))) df['counts'] = df['landmark_id'].map(counts_map) df['prob'] = ( (1/df.counts**alpha) / (1/df.counts**alpha).max()).astype(np.float32) # ? uniques = df['landmark_id'].unique() # unique classes df['label'] = df['landmark_id'].map(dict(zip(uniques, range(len(uniques))))) # scale landmark_id to 0- image_paths, labels, probs = df.path.to_numpy(), df.label.to_numpy(), df.prob.to_numpy() def split_data(images, labels, train_size=0.9, shuffle=True): """ not stratified, train will have not all classes """ # 1. Get the total size of the dataset size = len(images) # 2. Make an indices array and shuffle it, if required indices = np.arange(size) if shuffle: np.random.shuffle(indices) # 3. Get the size of training samples train_samples = int(size * train_size) # 4. Split data into training and validation sets x_train, y_train = images[indices[:train_samples]], labels[indices[:train_samples]] x_valid, y_valid = images[indices[train_samples:]], labels[indices[train_samples:]] return x_train, x_valid, y_train, y_valid x_train, x_valid, y_train, y_valid = split_data(image_paths, labels) # --------- dataset class img_width = 736 img_height = 736 def encode_single_sample(img_path, label): print(img_path, label) # 1. Read image img = tf.io.read_file(img_path) # 2. Decode and convert to grayscale img = tf.io.decode_jpeg(img, channels=3) # 3. Convert to float32 in [0, 1] range img = tf.image.convert_image_dtype(img, tf.float32) # 4. Resize to the desired size img = tf.image.resize(img, [img_height, img_width]) # 5. Transpose the image because we want the time # dimension to correspond to the width of the image. img = tf.transpose(img, perm=[1, 0, 2]) # 7. Return a dict as our model is expecting two inputs return {"image": img, "label": label} train_dataset = tf.data.Dataset.from_tensor_slices((x_train.astype(str), y_train.astype(int))) train_dataset = train_dataset.map(encode_single_sample) valid_dataset = tf.data.Dataset.from_tensor_slices((x_valid.astype(str), y_valid.astype(int))) valid_dataset = valid_dataset.map(encode_single_sample) # dataset = dataset.map( # lambda x, y, p: (read_image(x), y, p), # tf.data.experimental.AUTOTUNE) # # anotehr approach: # train_list = glob.glob('../input/landmark-retrieval-2020/train/*/*/*/*') # test_list = glob.glob('../input/landmark-retrieval-2020/test/*/*/*/*') # index_list = glob.glob('../input/landmark-retrieval-2020/index/*/*/*/*') if __name__=="__main__": args = sys.argv[1:] print('args', args) main(args)
10.11.4. mnist
import tensorflow as tf def encode_single_sample(img_path, label): tf.io.read_file(image_path) tf.image.decode_jpeg(image, channels=3) mnist = tf.keras.datasets.mnist (x_train, y_train), (x_test, y_test) = mnist.load_data() x_train, x_test = x_train / 255.0, x_test / 255.0 # -- dataset batch_size=16 train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)) map( encode_single_sample, num_parallel_calls=tf.data.AUTOTUNE ) train_dataset = train_dataset.shuffle(60000).repeat().batch(batch_size) validation_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test)) # -- train model.fit(train_dataset, epochs=5, steps_per_epoch=200)
10.12. tf.data.dataset
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
dataset must consist of typeles - (x, y) by default, but it may be dictionary
10.12.1. test
for elem in train_dataset_y.take(10): print(elem.numpy().shape) # or print(elem['label'].numpy().shape) print(train_dataset.__iter__().next())
10.13. install
see Tested build configurations tensorflow.org/install/source#linux
- apt clean; apt updatel apt purge cuda ; apt purge nvidia-*; apt autoremoveq
- install "cuda toolkit" from archive
- pip3 install tensorflow-gpu==2.3.0
10.14. install from source
Для компиляции tensorflow используется гугловая система сборки Bazel
10.15. APIs
- tf.nn - very low level
- tf.layers - higher
- tf.keras - highest
- просто сразу вычисляет tf.enable_eager_execution()
10.16. tf.placeholder
amy = placeholder - это тензоры в графе, которым присваивается имя amy
sess.run([tensors], feed_dict={amy: 1}) # заполняет placeholders and выполняет тензоры
10.17. Logger = Disable
import os import tensorflow as tf os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
10.18. 4D tensor
- N refers to the number of images in a batch.
- H refers to the number of pixels in the vertical (height) dimension.
- W refers to the number of pixels in the horizontal (width) dimension.
- C refers to the channels. For example, 1 for black and white or grayscale and 3 for RGB.
Formats:
- NCHW or channels_first - optimal for NVIDIA GPUs cuDNN - If not using the Intel MKL, some operations are not supported on CPU when using NCHW
- NHWC or channels_last - TensorFlow default - little faster on CPU - we are working on tools to auto rewrite graphs to make switching between the formats transparent and take advantages of micro optimizations where a GPU op may be faster using NHWC
channels_last - default for keras
10.19. install
- pip install tensorflow –user
- import tensorflow as tf
- tf.InteractiveSession()
10.20. Deploy
- Java
- C
- Go
10.21. tensor
- https://www.tensorflow.org/guide/tensors
- Tensor a mathematical object analogous to but more general than a vector, represented by an array of components that are functions of the coordinates of a space
- unit of data, geometric objects that describe linear relations between geometric vectors, scalars, and other
tensors
- has Rank
- set of primitive values shaped into an array of any number of dimensions
- rank/dimension zero tensor - 5 - scalar - shape is []
- rank/dimension 1 tensor - [ 1., 2., 3., 4. ] - Vector - shape is [4]
- rank/dimension 2 tensor or a Matrix - shape [ 2, 4] - [ [ 1., 2., 3., 4. ], [ 5., 6., 7., 8. ] ]
Граф состоит из узлов op, связанных друг с другом, представляющих операции.
- Операция выделяет память для своих выходов, которые доступны в конечных точках :0, :1 и т.д. - похожих на тензор
10.22. hardware
GPU могу ускорить работу сети в 10-20 раз[1]
CPU
- С достаточно мощной видеокартой мощность процессора практически не важна, потому что всю нагрузку возмет GPU
- желательно Intel® Xeon®, Intel® Xeon Phi™
- если 2 видеокарты, то процессор должен их поддерживать.
GPU
- две GPU лучше чем одна на 20%. Переносимость модели на систему без GPU реализована.
- CUDA-Enabled NVIDIA video vard https://developer.nvidia.com/cuda-gpus
- Deep Learning Primitives (cuDNN) - part of Deep Learning SDK, requires CUDA Toolkit
- GPU Memory >=11 GB - больше лучше
- чем больше FLOPS тем лучше
- топы: NVIDIA QUADRO® GV100 или NVIDIA TITAN RTX
- GPU Cooling - очень важен - Air cooling - для одного или двух если между ними поместится ещё две
RAM
- RAM clock rates not required
- RAM size больше чем GPU Memory одной из карт - больше памяти, удобнее работа для человека.
PSU if you have 4 GPUs with each 250 watts TDP and a CPU with 150 watts TDP, then you will need a PSU with a minimum of 4×250 + 150 + 100 = 1250 watts
Quandro P1000 PCE-3.0 кабинет 42 Соловьев
Счет на Кирила скинуть, Артем сказал скинуть счет на оплату, с Минофьевым согласовали, отправить в москву.
Андрей Свиридов поговорил с ЦФТ о возможности получить тестовый доступ к их облачному сервису расладвающему назначения на компоненты.
Почтовый ящик с заявками, текст и сканы.
- TensorFlow для глубокого обучения. Барат Рамсундар, Реза Босаг Заде. 2019г.
- https://timdettmers.com/2018/12/16/deep-learning-hardware-guide/
- проверить материнскую плату что она PCI-E 3.0
- Написать письмо
- Можно ли с keras использовать несколько GPU
- прочитать по автокредиту что прислал в почте, посмотреть бизнес процессы
- читать банковское дело.
- Военкомат!! 11:00
https://www.ferra.ru/review/computers/nvidia-geforce-gtx-1070-asus-gigabyte-msi-palit-zotac.htm
Выбор видеокарты PALIT GeForce GTX 1070 27030р - 29000р
- https://belgorod.nix.ru/autocatalog/palit_graphics_accelerators/8Gb-PCI-E-GDDR5-Palit-GTX1070-JetStream-RTL-DVI-plus-HDMI-plus-3xDP-plus-SLI-GeForce-GTX1070_274136.html
- https://www.onlinetrade.ru/catalogue/videokarty-c338/palit/videokarta_palit_geforce_gtx_1070_1506mhz_pci_e_3.0_8192mb_8000mhz_256_bit_dvi_hdmi_hdcp_jetstream_ne51070015p2_1041j-556700.html?utm_source=market.yandex.ru&utm_medium=cpc&city=55&_openstat=bWFya2V0LnlhbmRleC5ydTvQktC40LTQtdC-0LrQsNGA0YLQsCBQQUxJVCBHZUZvcmNlIEdUWCAxMDcwIDE1MDZNaHogUENJLUUgMy4wIDgxOTJNYiA4MDAwTWh6IDI1NiBiaXQgRFZJIEhETUkgSERDUCBKZXRTdHJlYW0gKE5FNTEwNzAwMTVQMi0xMDQxSik7dVFPbk1jckprVlZZWmNEamR5UVBiUTs&ymclid=15602370129371193275200002
GeForce RTX 2060
Железо 50 70
10.23. hello world
import tensorflow as tf a = tf.add(3, 5) sess = tf.Session() print sess.run(a) sess.close() # or with tf.Session() as sess: print sess.run(a)
10.24. main objects
- tf.Session - содержит один глобальный граф
- tf.InteractiveSession - makes itself the default
- tf.Tensor
- tf.constant(value, dtype=None, shape=None, name='Const', verify_shape=False)
- stored in the graph definition
- loading graphs expensive
- tf.constant(value, dtype=None, shape=None, name='Const', verify_shape=False)
- tf.placeholder - input for graph
when constants are big
- tf.Operation
- tf.Graph - состоит из экземпляров tf.Tensor и tf.Operation.
- Multiple graphs require multiple sessions, each will try to use all available resources by default
- Can't pass data between them without passing them through python/numpy, which doesn't work in distributed
- It’s better to have disconnected subgraphs within one graph
- data types
- tf.int32
- tf.float32
- tf.float64
- tf.string
- tf.bool
10.25. Переменные
- tf.Varible - контейнер Tensor
- tf.assign
Инициализация
init = tf.global_variables_initializer() with tf.Session() as sess: sess.run(init)
nitialize a single variable
W = tf.Variable(tf.zeros([784,10])) with tf.Session() as sess: sess.run(W.initializer) print W.eval()
10.26. TensorBoard
2 run it:
- $ python [yourprogram].py
- $ tensorboard –logdir="./graphs" –port 6006
- http://localhost:6006/
1 save it:
import tensorflow as tf a = tf.constant(2, name="a") b = tf.constant(3, name="b") x = tf.add(a, b, name="add") with tf.Session() as sess: # add this line to use TensorBoard. writer = tf.summary.FileWriter('./graphs, sess.graph) print sess.run(x) writer.close() # close the writer when you’re done using
10.27. GPU
https://www.tensorflow.org/install/gpu
- pip3 install tensorflow-gpu –user
Required:
- import tensorflow as tf
- config = tf.ConfigProto()
- config.gpu_options.allow_growth = True
- session = tf.Session(config=config)
10.28. keras
from tensorflow import keras from tensorflow.python.keras.api._v2.keras.layers import BatchNormalization, Dense, Dropout, Activation, Flatten, \ Conv2D, MaxPooling2D from tensorflow.python.keras.api._v2.keras.models import Sequential
10.29. CNN
tf.nn.conv2d(feat,
- weight, - input
- strides=[1,1,1,1], - 1,2 or 4 - stride of the sliding window for each dimension of input
- padding="VALID")+bias
tf.nn.max_pool(feat,
- ksize=[1,2,2,1] - window per every dimension
- strides=[1,2,2,1]
- padding="VALID")
10.30. RNN and LSTM
- TODO https://www.tensorflow.org/guide/keras/rnn
- https://github.com/curiousily/Deep-Learning-For-Hackers
- https://github.com/aymericdamien/TensorFlow-Examples/blob/master/tensorflow_v2/notebooks/3_NeuralNetworks/recurrent_network.ipynb
stateful=True requre constant batch_size
10.30.1. CNN
10.30.2. batch
https://machinelearningmastery.com/stateful-stateless-lstm-time-series-forecasting-python/
You can set RNN layers to be 'stateful', which means that the states computed for the samples in one batch will be reused as initial states for the samples in the next batch. This assumes a one-to-one mapping between samples in different successive batches.
You can specify the initial state of RNN layers symbolically by calling them with the keyword argument initial_state. The value of initial_state should be a tensor or list of tensors representing the initial state of the RNN layer.
You can specify the initial state of RNN layers numerically by calling reset_states with the keyword argument states. The value of states should be a numpy array or list of numpy arrays representing the initial state of the RNN layer.
it may be possible to simulate a stateful LSTM with a stateless LSTM using a large batch size.
10.31. plot learning curve
print(history.history.keys()) # ['loss', 'acc', 'val_loss', 'val_acc'] from matplotlib import pyplot as plt plt.figure(1) # summarize history for accuracy plt.subplot(211) plt.plot(history.history['acc']) plt.plot(history.history['val_acc']) plt.title('model accuracy') plt.ylabel('accuracy') # plt.xlabel('epoch') plt.legend(['train', 'test'], loc='upper left') # summarize history for loss plt.subplot(212) plt.plot(history.history['loss']) plt.plot(history.history['val_loss']) plt.title('model loss') plt.ylabel('loss') # plt.xlabel('epoch') plt.legend(['train', 'test'], loc='upper left') plt.show()
10.32. plot CNN layout
summaryWriter = tf.summary.FileWriter("model_name") summaryWriter.add_graph(sess.graph)
summaryWriter.add_summary(sess.run(summaryMeanTest0,feed_dict={testImagePH:testMean[0]}),i+1)
10.33. Optimizer
softmaxLoss = tf.losses.softmax_cross_entropy(onehot_labels=labelOnehot, logits=output) -> float or [batch]
- labelOnehot - оригиналы
- logits - то что вернула сеть
- reduction: str = Reduction.SUM_BY_NONZERO_WEIGHTS - default
- optimizer = tf.train.GradientDescentOptimizer(learning_rate).minimize(cost)
- cost - ?
- sess.run(tf.global_variables_initializer())
- sess.run([optim,loss],feed_dict=batch)
ways:
- minimize()
- opt = GradientDescentOptimizer(learning_rate=0.1)
- opt_op = opt.minimize(cost, var_list=<list of variables>) - computing the gradients and applying them to the variables
- sess.run([opt_op,loss], feed_dict=batch) or opt_op.run()
- compute_gradients() - process the gradients before applying them
- opt = GradientDescentOptimizer(learning_rate=0.1)
- grads_and_vars = opt.compute_gradients(loss, <list of variables>)
- capped_grads_and_vars = [(MyCapper(gv[0]), gv[1]) for gv in grads_and_vars]
- opt.apply_gradients(capped_grads_and_vars)
- sess.run([opt,loss], feed_dict=batch)
lrGP_PH, lrC_PH = tf.placeholder(tf.float32, shape=[]), tf.placeholder(tf.float32, shape=[]) optim = tf.train.AdamOptimizer(learning_rate=lrC_PH).minimize(loss, global_step=tf.train.get_global_step()) lrC = opt.lrC*opt.lrCdecay**(i//opt.lrCstep) batch[lrC_PH] = lrC sess.run
10.34. models - tensorflow_models as tfm
- https://github.com/tensorflow/models/
- usage guide https://github.com/tensorflow/models/blob/master/tensorflow_models/tensorflow_models_pypi.ipynb
- usage examp https://colab.research.google.com/github/tensorflow/models/blob/master/docs/vision/image_classification.ipynb
- mnist legacy https://github.com/tensorflow/models/blob/e11f52948a993c8de15c4d87241044bc769e767b/official/legacy/image_classification/mnist_main.py
10.34.1. install
pip3 install tf-models-official==2.13
/usr/local/lib/python3.8/dist-packages
pip3 install tf-models-official==2.13 ; apt install -y emacs-nox
10.34.2. usage
git clone –depth=1 https://github.com/tensorflow/models
- Experiment factory (config in JSON/YAML) https://colab.research.google.com/github/tensorflow/models/blob/master/docs/vision/image_classification.ipynb#scrollTo=5iN8mHEJjKYE
- class constructor (tf.keras.Model) https://github.com/tensorflow/models/blob/master/tensorflow_models/tensorflow_models_pypi.ipynb
10.34.3. mnist
cd models/official/legacy/image_classification python mnist_main.py python mnist_main.py -ds parameter_server –data_dir /workspace/mnist
-ds,–distribution_strategy: The Distribution Strategy to use for training. Accepted values are 'off', 'one_device', 'mirrored', 'parameter_server', 'collective', case insensitive. 'off' means not to use Distribution Strategy; 'default' means to choose from `MirroredStrategy` or `OneDeviceStrategy` according to the number of GPUs. (default: 'mirrored')
-ng,–num_gpus: How many GPUs to use at each worker with the DistributionStrategies API. The default is 1. (default: '1') (an integer)
-te,–train_epochs: The number of epochs used to train. (default: '1') (an integer)
official.utils.flags._base: -bs,–batch_size: Batch size for training and evaluation. When using multiple gpus, this is the global batch size for all devices. For example, if the batch size is 32 and there are 4 GPUs, each GPU will get 8 examples on each step. (default: '1024') (an integer)
-te,–train_epochs: The number of epochs used to train. (default: '1') (an integer)
10.34.4. dummy dataset for MNIST
dummy_data = ( tf.ones(shape=(10, 28, 28, 1), dtype=tf.int32), tf.range(10), ) datasets = ( tf.data.Dataset.from_tensor_slices(dummy_data), tf.data.Dataset.from_tensor_slices(dummy_data), )
10.34.5. Mobilenet example
# https://www.tensorflow.org/api_docs/python/tfm/vision/backbones/MobileNet # https://stackoverflow.com/questions/63284471/tensorflow-use-model-inside-another-model-as-layer import tensorflow as tf import tensorflow_models as tfm from tensorflow.keras import Input from tensorflow.keras import Model IS = 28 INPUT_SIZE = (IS, IS) OUTPUT_SIZE = 2 input_specs = tf.keras.layers.InputSpec(shape=[None, IS, IS, 3]) sub_model = tfm.vision.backbones.MobileNet( input_specs=input_specs, filter_size_scale=0.65, ) def model_test(input_shape, sub_model): inputs = Input(input_shape) intermedio = sub_model(inputs) iv = list(intermedio.values()) f0 = tf.keras.layers.Flatten()(iv[0]) f1 = tf.keras.layers.Flatten()(iv[1]) f2 = tf.keras.layers.Flatten()(iv[2]) dense_intr = tf.keras.layers.Concatenate()([f0, f1, f2]) outputs = tf.keras.layers.Dense(OUTPUT_SIZE, activation=tf.keras.activations.softmax, name="d-out")(dense_intr) model = Model(inputs=inputs, outputs=outputs) return model model = model_test((IS, IS, 3), sub_model) model = model_test(INPUT_SIZE, sub_model) # -- inference with dummy test inputs = tf.keras.Input(shape=(IS, IS, 3), batch_size=1) endpoints = model(inputs=inputs) # -- compile model.compile(loss="categorical_crossentropy", optimizer="adam") # -- train print(model.name) print(endpoints)
10.34.6. RESNET example
# https://www.tensorflow.org/api_docs/python/tfm/vision/backbones/MobileNet # https://stackoverflow.com/questions/63284471/tensorflow-use-model-inside-another-model-as-layer import tensorflow as tf import tensorflow_models as tfm from tensorflow.keras import Input from tensorflow.keras import Model import os import pandas as pd import numpy as np IS = 736 INPUT_SIZE = (IS, IS, 3) OUTPUT_SIZE = None # lets get count of classes from Data BATCH_SIZE = 5 DROUPOUT_RATE=0.2 # ---- Data ---- def get_paths(path="/landmark-retrieval-2020/train", max_count=-1): index = ["0","1","2","3","4","5","6","7","8","9","a","b","c","d","e","f"] paths = [] for a in index: for b in index: for c in index: paths.extend([path+f"/{a}/{b}/{c}/" + x for x in os.listdir(path+f"/{a}/{b}/{c}")]) if max_count > 0 and len(paths) > max_count: break return paths paths = get_paths("/landmark-retrieval-2020/train", 150000) df = pd.read_csv("/landmark-retrieval-2020/train.csv") # count 1580470 # id landmark_id mapping = {} for path in paths: mapping[path.split('/')[-1].split('.')[0]] = path df['path'] = df['id'].map(mapping) # add path column df = df[~ df.path.isna()] # select records with "path" column # - add probability for ... alpha=0.6 counts_map = dict(df.groupby('landmark_id')['path'].agg(lambda x: len(x))) df['counts'] = df['landmark_id'].map(counts_map) df['prob'] = ( (1/df.counts**alpha) / (1/df.counts**alpha).max()).astype(np.float32) # ? # select classes where we have enough examples print("df[df.counts >70].shape", df[df.counts >70].shape) # >>> (4934, 5) df = df[df.counts >70] uniques = df['landmark_id'].unique() # unique classes OUTPUT_SIZE = len(uniques) df['label'] = df['landmark_id'].map(dict(zip(uniques, range(len(uniques))))) # scale landmark_id to 0- image_paths, labels, probs = df.path.to_numpy(), df.label.to_numpy(), df.prob.to_numpy() def split_data(images, labels, train_size=0.9, shuffle=True): # 1. Get the total size of the dataset size = len(images) # 2. Make an indices array and shuffle it, if required indices = np.arange(size) if shuffle: np.random.shuffle(indices) # 3. Get the size of training samples train_samples = int(size * train_size) # 4. Split data into training and validation sets x_train, y_train = images[indices[:train_samples]], labels[indices[:train_samples]] x_valid, y_valid = images[indices[train_samples:]], labels[indices[train_samples:]] return x_train, x_valid, y_train, y_valid x_train, x_valid, y_train, y_valid = split_data(image_paths, labels) # ----- Model ---- # -- sub_model - depend on model input_specs = tf.keras.layers.InputSpec(shape=[None, IS, IS, 3]) sub_model = tfm.vision.backbones.resnet.ResNet( model_id = 50, input_specs = input_specs, ) # -- Get outputs tensor of submodel inputs = tf.keras.Input(shape=INPUT_SIZE, batch_size=1) endpoints = sub_model(inputs=inputs) print("endpoints", endpoints) print() # -- wrap sub_model in new Model to add input and output layers def wrap_model(input_shape, sub_model): """ add inputs and outputs to model """ inputs = Input(input_shape) intermedio = sub_model(inputs) # """Merge outputs - depende on model""" pooling = tf.keras.layers.GlobalAveragePooling2D(name='head/pooling') # dropout = tf.keras.layers.Dropout(DROUPOUT_RATE, name='head/dropout') # dense = tf.keras.layers.Dense(dense_units, name='head/dense') # x = intermedio # x = pooling(x) # x = dropout(x) # x = dense(x) iv = list(intermedio.values()) f0 = tf.keras.layers.Flatten()(pooling(iv[0])) f1 = tf.keras.layers.Flatten()(pooling(iv[1])) f2 = tf.keras.layers.Flatten()(pooling(iv[2])) f3 = tf.keras.layers.Flatten()(pooling(iv[3])) x = tf.keras.layers.Concatenate()([f0, f1, f2, f3]) # final layout: outputs = tf.keras.layers.Dense(OUTPUT_SIZE, activation=tf.keras.activations.softmax, name="d-out")(x) model = Model(inputs=inputs, outputs=outputs) return model model = wrap_model(INPUT_SIZE, sub_model) model.summary() print("model.layers[0]._name", model.layers[0]._name) model.layers[0]._name = "image" # -- compile model.compile(loss="categorical_crossentropy", optimizer="adam") # ---- Dataset class ---- img_width = 736 img_height = 736 def encode_single_sample(img_path, label): # 1. Read image img = tf.io.read_file(img_path) # 2. Decode and convert to grayscale img = tf.io.decode_jpeg(img, channels=3) # 3. Convert to float32 in [0, 1] range img = tf.image.convert_image_dtype(img, tf.float32) # 4. Resize to the desired size img = tf.image.resize(img, [img_height, img_width]) # 5. Transpose the image because we want the time # dimension to correspond to the width of the image. img = tf.transpose(img, perm=[1, 0, 2]) # 7. Return a dict as our model is expecting two inputs # layer = tf.keras.layers.CategoryEncoding(num_tokens=OUTPUT_SIZE, output_mode="one_hot") label = tf.one_hot(label, OUTPUT_SIZE) return img, label train_dataset = tf.data.Dataset.from_tensor_slices((x_train.astype(str), y_train.astype(int))).skip(df.shape[0] - df.shape[0]//4) train_dataset = train_dataset.map(lambda x, y: encode_single_sample(x, y), tf.data.experimental.AUTOTUNE) train_dataset = train_dataset.batch(BATCH_SIZE).prefetch(100) validation_dataset = tf.data.Dataset.from_tensor_slices((x_valid.astype(str), y_valid.astype(int))).skip(df.shape[0] - df.shape[0]//4) validation_dataset = validation_dataset.map(lambda x, y: encode_single_sample(x, y), tf.data.experimental.AUTOTUNE) validation_dataset = train_dataset.prefetch(100) # ---- train ---- model.fit(train_dataset, epochs=1) # -- checks the model's performance print("evaluate") model.evaluate(validation_dataset, verbose=2) # -- inferece print("inference", x_valid[0], y_valid[0]) im, l = encode_single_sample(x_valid[0], y_valid[0]) im = tf.expand_dims(im, axis=0) print("im", im.shape) predictions = model.predict(im, batch_size=1) print(np.argmax(predictions)) print("label:", y_valid[0])
10.35. TensorFlow Serving
10.35.1. terms
- Servables - anything, and multiple independent servables
- Loaders - manage a servable's life cycle
- Sources - are plugin modules that find and provide servables
- Managers - loading, serving, unloading
- main https://github.com/tensorflow/serving
- basic tutorial https://www.tensorflow.org/tfx/serving/serving_basic
- advanced tutorial https://www.tensorflow.org/tfx/serving/serving_advanced
kubernetes install https://github.com/tensorflow/serving/blob/master/tensorflow_serving/g3doc/serving_kubernetes.md
10.36. TODO TFX pipeline - MLOps
is a portable implementation of an ML workflow that can be run on various orchestrators, such as: Apache Airflow, Apache Beam, and Kubeflow Pipelines.
10.37. loss
- loss = tf.losses.softmax_cross_entropy(onehot_labels=labelOnehot, logits=output, reduction=tf.losses.Reduction.MEAN)
- lossm = tf.metrics.mean(loss)
10.38. ctc_loss
- https://programtalk.com/python-more-examples/tensorflow.nn.ctc_loss/
- https://github.com/lz1313/BlockCIrculantRNN/blob/master/model.py
- https://github.com/zfxxfeng/cnn_lstm_ctc_ocr_for_ICPR/blob/master/src/model.py
- https://github.com/mdangschat/ctc-asr/blob/master/asr/model.py
- https://github.com/nginyc/rafiki/blob/master/examples/models/speech_recognition/TfDeepSpeech.py
10.39. custom metric
levels:
- function -> values summarized and divided by count
- class -> gives full control
10.39.1. function
total categorical accuracy
def total_categorical_accuracy(y_true, y_pred): # a = tf.cast(tf.math.equal(tf.argmax(y_true, axis=-1), tf.argmax(y_pred, axis=-1)), dtype=y_pred.dtype) a = keras.metrics.categorical_accuracy(y_true, y_pred) classes = tf.constant(a.shape[1], a.dtype) a2 = tf.reduce_sum(a, axis=-1) c = tf.cast(tf.math.equal(a2, classes), dtype=classes.dtype) return c model.compile(loss=loss, optimizer=opt.optimizer, metrics=["categorical_accuracy",total_categorical_accuracy])
10.39.2. class
class ConfusionMatrixMetric(tf.keras.metrics.Metric): def update_state(self, y_true, y_pred,sample_weight=None): self.total_cm.assign_add(self.confusion_matrix(y_true,y_pred)) return self.total_cm def result(self): return self.process_confusion_matrix() def confusion_matrix(self,y_true, y_pred): """ Make a confusion matrix """ y_pred=tf.argmax(y_pred,1) cm=tf.math.confusion_matrix(y_true,y_pred,dtype=tf.float32,num_classes=self.num_classes) return cm def process_confusion_matrix(self): "returns precision, recall and f1 along with overall accuracy" cm=self.total_cm diag_part=tf.linalg.diag_part(cm) precision=diag_part/(tf.reduce_sum(cm,0)+tf.constant(1e-15)) recall=diag_part/(tf.reduce_sum(cm,1)+tf.constant(1e-15)) f1=2*precision*recall/(precision+recall+tf.constant(1e-15)) return precision,recall,f1
10.40. distributed training
10.40.1. API
- tf.distribute.Strategy
- high-level API Keras Model.fit
- Custom training loop
- Estimator API (Limited Support)
Notes:
- Custom training loops: Eager mode is only recommended for debugging, in a graph recommended using tf.function (custom training loops)
10.40.2. terms
- replica
- copy of the model
- Parameter servers
- machines that hold a single copy of parameters/variables
- Replica context
- strategy.run function - when executing the computation function that is being replicated.
- Cross-replica context
- when you enter a strategy.scope
- Update context
- tf.distribute.StrategyExtended.update call
- Reductions
- method of aggregating multiple values into one value (sync training)
- All-reduce
- is an algorithm for performing a reduction on values from multiple devices and making the result available on all of those devices
- Mirrored variables
- variables that are created on multiple devices, where we keep the variables in sync by applying the same updates to every copy.
- Distribute-aware layers
- generally called in a replica context.
10.40.3. Synchronous vs asynchronous training
sync - via all-reduce
- workers train over different slices of input data (Data parallelism)
- aggregating gradients at each step
- the updates from each replica are aggregated together before updating the model variables
async - via parameter server architecture
- all workers are independently training over the input data and updating variables asynchronously
- each replica updates the model variables independently
groups:
- replicas partitioned into groups which are in sync within each group but async between groups.
10.40.4. strategies
MultiWorkerMirroredStrategy is very similar to MirroredStrategy. It implements synchronous distributed training across multiple workers, each with potentially multiple GPUs.
- MirroredStrategy
tf.distribute.MirroredStrategy
mirrors variables to multiple devices.
Each variable in the model is mirrored across all the replicas. These variables are kept in sync with each other by applying identical updates.
- kubeflow ex MultiWorkerMirroredStrategy
"""An example of multi-worker training with Keras model using Strategy API.""" from __future__ import absolute_import, division, print_function import argparse import json import os import tensorflow_datasets as tfds import tensorflow as tf from tensorflow.keras import layers, models def make_datasets_unbatched(): BUFFER_SIZE = 10000 # Scaling MNIST data from (0, 255] to (0., 1.] def scale(image, label): image = tf.cast(image, tf.float32) image /= 255 return image, label datasets, _ = tfds.load(name='mnist', with_info=True, as_supervised=True) return datasets['train'].map(scale).cache().shuffle(BUFFER_SIZE) def build_and_compile_cnn_model(): model = models.Sequential() model.add( layers.Conv2D(32, (3, 3), activation='relu', input_shape=(28, 28, 1))) model.add(layers.MaxPooling2D((2, 2))) model.add(layers.Conv2D(64, (3, 3), activation='relu')) model.add(layers.MaxPooling2D((2, 2))) model.add(layers.Conv2D(64, (3, 3), activation='relu')) model.add(layers.Flatten()) model.add(layers.Dense(64, activation='relu')) model.add(layers.Dense(10, activation='softmax')) model.summary() model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy']) return model def decay(epoch): if epoch < 3: #pylint: disable=no-else-return return 1e-3 if 3 <= epoch < 7: return 1e-4 return 1e-5 def main(args): # MultiWorkerMirroredStrategy creates copies of all variables in the model's # layers on each device across all workers # if your GPUs don't support NCCL, replace "communication" with another strategy = tf.distribute.MultiWorkerMirroredStrategy( communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CollectiveCommunication.AUTO)) BATCH_SIZE_PER_REPLICA = 64 BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync with strategy.scope(): ds_train = make_datasets_unbatched().batch(BATCH_SIZE).repeat() options = tf.data.Options() options.experimental_distribute.auto_shard_policy = \ tf.data.experimental.AutoShardPolicy.DATA ds_train = ds_train.with_options(options) # Model building/compiling need to be within `strategy.scope()`. multi_worker_model = build_and_compile_cnn_model() # Define the checkpoint directory to store the checkpoints checkpoint_dir = args.checkpoint_dir # Name of the checkpoint files checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}") # Function for decaying the learning rate. # You can define any decay function you need. # Callback for printing the LR at the end of each epoch. class PrintLR(tf.keras.callbacks.Callback): def on_epoch_end(self, epoch, logs=None): #pylint: disable=no-self-use print('\nLearning rate for epoch {} is {}'.format( epoch + 1, multi_worker_model.optimizer.lr.numpy())) callbacks = [ tf.keras.callbacks.TensorBoard(log_dir='./logs'), tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_prefix, save_weights_only=True), tf.keras.callbacks.LearningRateScheduler(decay), PrintLR() ] # Keras' `model.fit()` trains the model with specified number of epochs and # number of steps per epoch. Note that the numbers here are for demonstration # purposes only and may not sufficiently produce a model with good quality. multi_worker_model.fit(ds_train, epochs=10, steps_per_epoch=70, callbacks=callbacks) # Saving a model # Let `is_chief` be a utility function that inspects the cluster spec and # current task type and returns True if the worker is the chief and False # otherwise. def is_chief(): return TASK_INDEX == 0 if is_chief(): model_path = args.saved_model_dir else: # Save to a path that is unique across workers. model_path = args.saved_model_dir + '/worker_tmp_' + str(TASK_INDEX) multi_worker_model.save(model_path) if __name__ == '__main__': os.environ['NCCL_DEBUG'] = 'INFO' tfds.disable_progress_bar() # to decide if a worker is chief, get TASK_INDEX in Cluster info tf_config = json.loads(os.environ.get('TF_CONFIG') or '{}') TASK_INDEX = tf_config['task']['index'] parser = argparse.ArgumentParser() parser.add_argument('--saved_model_dir', type=str, required=True, help='Tensorflow export directory.') parser.add_argument('--checkpoint_dir', type=str, required=True, help='Tensorflow checkpoint directory.') parsed_args = parser.parse_args() main(parsed_args)
- kubeflow ex MultiWorkerMirroredStrategy
- CentralStorageStrategy (experimental)
tf.distribute.experimental.CentralStorageStrategy
puts all variables on a single device on the same machine (and does sync training).
- ParameterServerStrategy (experimental)
creates variables on the parameter servers.
api
- Model.fit
- custom training loop
- tf.distribute.experimental.ParameterServerStrategy (tensorflow 1.0)
- tf.distribute.ParameterServerStrategy
notes:
- data-parallel method
- All replicas that want to operate on a variable retrieve parameters/variables from Par server at the beginning of a step and send an update to be applied at the end of the step. These can in principle support either sync or async training, but right now we only have support for async training with parameter servers.
- workers and parameter servers
- Variables are created on parameter servers and they are read and updated by workers in each step
- workers read and update these variables independently without synchronizing with each other (asynchronous training)
- 'cluster' with several 'jobs', and each of the jobs may have one or more 'tasks'
recommended to have:
- One coordinator job (has the job name or task type: chief) - creates resources, dispatches training tasks, writes checkpoints, and deals with task failures.
- know the addresses and ports of all other TensorFlow servers, except the evaluator.
- Multiple worker jobs (job name or task type: worker)
- need to know which port they need to listen to.
- all workers should have the same number of GPUs available.
- each worker receives the same dataset, except when it is shuffled differently
- Multiple parameter server jobs (job name or task type: ps) - tf.distribute.Server
- need to know which port they need to listen to.
- evaluator (optional) -
worker and ps
- run tf.distribute.Server instances that listen for requests from the chief.
- dataset_fn will be wrapped into a tf.function and then executed on each worker to generate the data pipeline.
- apply the transformation inside the dataset_fn via tf.data.Dataset.map
datasets allowed to use:
- tf.data.Dataset
- tf.distribute.DistributedDataset
- tf.keras.utils.experimental.DatasetCreator - the code in dataset_fn will be invoked on the input device, which is usually the CPU, on each of the worker machines.
repeat and steps_per_epoch
- Dataset.repeat — which repeats a dataset indefinitely when called without an argument—and specify the steps_per_epoch argument in the Model.fit call.
Note from TF (Model.fit):
- When using a `tf.keras.utils.experimental.DatasetCreator`, `steps_per_epoch`, `validation_steps`, `steps`,
or `pss_evaluation_shards` argument must be provided in `Model.fit`, `Model.evaluate`, or `Model.predict`
- validation_steps - for validation data
- pss_evaluation_shards - The number of shards should be at least the number of workers for good performance.
- tf.data.experimental.AutoShardPolicy
- OFF: No sharding will be performed.
- AUTO: Attempts FILE-based sharding, falling back to DATA-based sharding.
- FILE: Shards by input files (i.e. each worker will get a set of files to process). When this option is selected, make sure that there is at least as many files as workers. If there are fewer input files than workers, a runtime error will be raised.
- DATA: Shards by elements produced by the dataset. Each worker will process the whole dataset and discard the portion that is not for itself. Note that for this mode to correctly partitions the dataset elements, the dataset needs to produce elements in a deterministic order.
- HINT: Looks for the presence of shard(SHARD_HINT, …) which is treated as a placeholder to replace with shard(num_workers, worker_index).
usage:
- options = tf.data.Options()
- options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF
- train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
- train_dataset = train_dataset.with_options(options)
AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Did not find a shardable source, walked to a node which is not a dataset: name: "FlatMapDataset/_2"
- Evaluation
For users using Model.fit, Model.evaluate uses inline (distributed) evaluation under the hood.
- inline evaluation
- sidecar evaluation
- algorithm
explanation 2014 https://www.usenix.org/system/files/conference/osdi14/osdi14-paper-li_mu.pdf
useful to compare "parameter server" to more general-purpose distributed systems:
- which mandate synchronous, iterative communication - iterative MapReduce framework
- Distributed GraphLab - asycnronously schedules communication using a graph abstraction.
core goal of parameter server:
- preserving state between iterations
Мы, как и прежде, создаём копии модели на всех воркерах. 8
- парализм данных
- model and fit
https://www.tensorflow.org/api_docs/python/tf/keras/Model
- steps_per_epoch - Total number of steps (batches of samples) before declaring one epoch finished and starting the next epoch
- dataset
batches that straddle epoch boundaries - пакетов, которые пересекают границы эпох
- repeat with no argument - infinity
- repeat + batch = batches that straddle epoch boundaries
- batch + repeat = clear epoch separation
- shuffle + repeat = show every element of one epoch before moving to the next
- repeat + shuffle = mixes the epoch boundaries together
- usage
# ---- who do what cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver() if cluster_resolver.task_type in ("worker", "ps"): # Start a TensorFlow server and wait. # Set the environment variable to allow reporting worker and ps failure to the # coordinator. This is a workaround and won't be necessary in the future. os.environ["GRPC_FAIL_FAST"] = "use_caller" server = tf.distribute.Server( cluster_resolver.cluster_spec(), job_name=cluster_resolver.task_type, task_index=cluster_resolver.task_id, protocol=cluster_resolver.rpc_layer or "grpc", start=True) server.join() elif cluster_resolver.task_type == "evaluator": # Run sidecar evaluation pass # note used else: # Run the coordinator. # ---- ParameterServerStrategy object. will use all the available GPUs on each worker NUM_PS=1 variable_partitioner = ( tf.distribute.experimental.partitioners.MinSizePartitioner( min_shard_bytes=(256 << 10), max_shards=NUM_PS)) strategy = tf.distribute.ParameterServerStrategy( cluster_resolver, variable_partitioner=variable_partitioner) # -- trivial model with strategy.scope(): # dataset_fn will be wrapped into a tf.function and then executed on each worker to generate the data pipeline. model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)]) model.compile(tf.keras.optimizers.legacy.SGD(), loss="mse", steps_per_execution=10)
- usage working parameter server strategy for TF 2.0
import tensorflow as tf import os # ---- who do what cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver() # -- set GPU for worker def set_gpu(): gpus = tf.config.list_physical_devices('GPU') if gpus: # Restrict TensorFlow to only use the first GPU try: tf.config.set_visible_devices(gpus[0], 'GPU') logical_gpus = tf.config.list_logical_devices('GPU') print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPU") except RuntimeError as e: # Visible devices must be set before GPUs have been initialized print(e) if cluster_resolver.task_type in ("worker"): set_gpu() # -- wait for task for worker and ps if cluster_resolver.task_type in ("worker", "ps"): # Start a TensorFlow server and wait. # Set the environment variable to allow reporting worker and ps failure to the # coordinator. This is a workaround and won't be necessary in the future. os.environ["GRPC_FAIL_FAST"] = "use_caller" server = tf.distribute.Server( cluster_resolver.cluster_spec(), job_name=cluster_resolver.task_type, task_index=cluster_resolver.task_id, protocol=cluster_resolver.rpc_layer or "grpc", start=True) print("cluster_resolver.task_type", cluster_resolver.task_type) print("cluster_resolver.task_id", cluster_resolver.task_id) print("cluster_resolver.rpc_layer", cluster_resolver.rpc_layer or "grpc") server.join() elif cluster_resolver.task_type == "evaluator": # Run sidecar evaluation pass # note used else: # Run the coordinator. # ---- ParameterServerStrategy object. will use all the available GPUs on each worker NUM_PS=1 variable_partitioner = ( tf.distribute.experimental.partitioners.MinSizePartitioner( min_shard_bytes=(256 << 10), max_shards=NUM_PS)) strategy = tf.distribute.ParameterServerStrategy( cluster_resolver, variable_partitioner=variable_partitioner) # -- data mnist = tf.keras.datasets.mnist (x_train, y_train), (x_test, y_test) = mnist.load_data() x_train, x_test = x_train / 255.0, x_test / 255.0 # -- trivial model with strategy.scope(): # dataset_fn will be wrapped into a tf.function and then executed on each worker to generate the data pipeline. # -- Dataset TF class batch_size=16 train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)) train_dataset = train_dataset.shuffle(60000).repeat().batch(batch_size) validation_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test)) validation_dataset = validation_dataset.shuffle(60000).batch(batch_size) # -- model model = tf.keras.models.Sequential([ tf.keras.layers.Flatten(input_shape=(28, 28)), tf.keras.layers.Dense(128, activation='relu'), tf.keras.layers.Dropout(0.2), tf.keras.layers.Dense(10) ]) loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) model.compile(optimizer='adam', loss=loss_fn, metrics=['accuracy'], pss_evaluation_shards='auto') # -- train model.fit(train_dataset, epochs=5, steps_per_epoch=300) # -- save model.save('aa.keras', overwrite=True, save_format="tf") # The file needs to end with the .keras extension model = tf.keras.models.load_model('aa.keras') # -- checks the model's performance model.evaluate(validation_dataset, verbose=2) # -- inferece predictions = model(x_train[:1]).numpy() import numpy as np print(np.argmax(predictions)) print(y_train[:1])
- usage working parameter server strategy for TF 2.0 v2
- usage3 dataset creator (comment several prams)
import tensorflow as tf import os # ---- who do what cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver() # -- set GPU for worker def set_gpu(): gpus = tf.config.list_physical_devices('GPU') if gpus: # Restrict TensorFlow to only use the first GPU try: tf.config.set_visible_devices(gpus[0], 'GPU') logical_gpus = tf.config.list_logical_devices('GPU') print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPU") except RuntimeError as e: # Visible devices must be set before GPUs have been initialized print(e) if cluster_resolver.task_type in ("worker"): set_gpu() # -- wait for task for worker and ps if cluster_resolver.task_type in ("worker", "ps"): # Start a TensorFlow server and wait. # Set the environment variable to allow reporting worker and ps failure to the # coordinator. This is a workaround and won't be necessary in the future. os.environ["GRPC_FAIL_FAST"] = "use_caller" server = tf.distribute.Server( cluster_resolver.cluster_spec(), job_name=cluster_resolver.task_type, task_index=cluster_resolver.task_id, protocol=cluster_resolver.rpc_layer or "grpc", start=True) print("cluster_resolver.task_type", cluster_resolver.task_type) print("cluster_resolver.task_id", cluster_resolver.task_id) print("cluster_resolver.rpc_layer", cluster_resolver.rpc_layer or "grpc") server.join() elif cluster_resolver.task_type == "evaluator": # Run sidecar evaluation pass # note used else: # Run the coordinator. # def dataset_fn(input_context): # dataset = dataset.map(preprocessing_layer) # return dataset # dataset_creator = tf.keras.utils.experimental.DatasetCreator(dataset_fn) # ---- ParameterServerStrategy object. will use all the available GPUs on each worker NUM_PS=1 variable_partitioner = ( tf.distribute.experimental.partitioners.MinSizePartitioner( min_shard_bytes=(256 << 10), max_shards=NUM_PS)) strategy = tf.distribute.ParameterServerStrategy( cluster_resolver, variable_partitioner=variable_partitioner) # -- data mnist = tf.keras.datasets.mnist (x_train, y_train), (x_test, y_test) = mnist.load_data() x_train, x_test = x_train / 255.0, x_test / 255.0 # -- trivial model with strategy.scope(): # dataset_fn will be wrapped into a tf.function and then executed on each worker to generate the data pipeline. # -- Dataset TF class train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)) validation_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test)) # -- model model = tf.keras.models.Sequential([ tf.keras.layers.Flatten(input_shape=(28, 28)), tf.keras.layers.Dense(128, activation='relu'), tf.keras.layers.Dropout(0.2), tf.keras.layers.Dense(10) ]) loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) # steps_per_execution=10, , pss_evaluation_shards='auto' model.compile(optimizer='adam', loss=loss_fn, metrics=['accuracy']) # -- train model.fit(x_train, y_train, epochs=5, steps_per_epoch=3) model.fit(train_dataset, epochs=5, steps_per_epoch=3000) # -- checks the model's performance model.evaluate(validation_dataset, verbose=2) # # -- inferece # predictions = model(x_train[:1]).numpy() # import numpy as np # print(np.argmax(predictions)) # print(y_train[:1])
- mnist last version
# Disable all GPUs. This prevents errors caused by all workers trying to use the same GPU. In a real-world application, each worker would be on a different machine. # import os # os.environ["CUDA_VISIBLE_DEVICES"] = "-1" import tensorflow as tf import os import logging import multiprocessing tf.get_logger().setLevel(logging.DEBUG) # ---- who do what cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver() # -- set GPU for worker def set_gpu(): gpus = tf.config.list_physical_devices('GPU') if gpus: # Restrict TensorFlow to only use the first GPU try: for device in gpus: tf.config.experimental.set_memory_growth(device, True) # tf.config.set_logical_device_configuration( # gpus[0], # [tf.config.LogicalDeviceConfiguration(memory_limit=3024)]) gpu_devices = tf.config.experimental.list_physical_devices('GPU') tf.config.set_visible_devices(gpus[0], 'GPU') logical_gpus = tf.config.list_logical_devices('GPU') print() print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPU") print() cpu_ph = tf.config.list_physical_devices('CPU') cpu_lg = tf.config.list_logical_devices('CPU') print(len(cpu_ph), "Physical CPUs,", len(cpu_lg), "Logical CPU") except RuntimeError as e: # Visible devices must be set before GPUs have been initialized print(e) # if cluster_resolver.task_type in ("worker", "ps"): set_gpu() # for all # -- wait for task for worker and ps if cluster_resolver.task_type in ("worker", "ps"): # Start a TensorFlow server and wait. # Set the environment variable to allow reporting worker and ps failure to the # coordinator. This is a workaround and won't be necessary in the future. os.environ["GRPC_FAIL_FAST"] = "use_caller" # # Workers need some inter_ops threads to work properly. worker_config = tf.compat.v1.ConfigProto(device_count={'GPU': 1, 'CPU':1}) if cluster_resolver.task_type in ("worker"): NUM_WORKERS=len(cluster_resolver.cluster_spec().job_tasks('worker')) if multiprocessing.cpu_count() < NUM_WORKERS + 1: worker_config.inter_op_parallelism_threads = NUM_WORKERS + 1 server = tf.distribute.Server( cluster_resolver.cluster_spec(), job_name=cluster_resolver.task_type, task_index=cluster_resolver.task_id, config=worker_config, protocol=cluster_resolver.rpc_layer or "grpc", start=True) print("cluster_resolver.task_type", cluster_resolver.task_type) print("cluster_resolver.task_id", cluster_resolver.task_id) print("cluster_resolver.rpc_layer", cluster_resolver.rpc_layer or "grpc") print("server.default_session_config", server.server_def.default_session_config) print() server.join() elif cluster_resolver.task_type == "evaluator": # Run sidecar evaluation pass # note used else: # Run the coordinator. # ---- ParameterServerStrategy object. will use all the available GPUs on each worker NUM_PS=len(cluster_resolver.cluster_spec().job_tasks('ps')) variable_partitioner = ( tf.distribute.experimental.partitioners.MinSizePartitioner( min_shard_bytes=(256 << 10), max_shards=NUM_PS)) strategy = tf.distribute.ParameterServerStrategy( cluster_resolver, variable_partitioner=variable_partitioner) # -- data mnist = tf.keras.datasets.mnist (x_train, y_train), (x_test, y_test) = mnist.load_data() x_train, x_test = x_train / 255.0, x_test / 255.0 # -- trivial model with strategy.scope(): # dataset_fn will be wrapped into a tf.function and then executed on each worker to generate the data pipeline. # with tf.device('/device:GPU:0'): batch_size=32 # -- Dataset TF class train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)) # suppress warning at worker, maybe fix error. options = tf.data.Options() options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA train_dataset = train_dataset.with_options(options) train_dataset = train_dataset.shuffle(600).repeat().batch(batch_size).prefetch(300) train_dataset = strategy.experimental_distribute_dataset(train_dataset) validation_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test)) validation_dataset = validation_dataset.shuffle(600).batch(batch_size) # -- model model = tf.keras.models.Sequential([ tf.keras.layers.Flatten(input_shape=(28, 28)), tf.keras.layers.Dense(400, activation='relu'), # tf.keras.layers.Dense(3420, activation='relu'), # tf.keras.layers.Dense(3420, activation='relu'), tf.keras.layers.Dropout(0.2), tf.keras.layers.Dense(10) ]) loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) model.compile(optimizer='adam', loss=loss_fn, metrics=['accuracy'], # not required: pss_evaluation_shards='auto' ) # print model model.summary() # -- train model.fit(train_dataset, epochs=5, steps_per_epoch=300) # -- save model.save('aa.keras', overwrite=True, save_format="tf") # The file needs to end with the .keras extension model = tf.keras.models.load_model('aa.keras') # -- checks the model's performance model.evaluate(validation_dataset, verbose=2) # -- inferece predictions = model(x_train[:1]).numpy() import numpy as np print(np.argmax(predictions)) print(y_train[:1])
- resnet
pip3 install tf-models-official==2.13 ; apt install -y emacs-nox
# Disable all GPUs. This prevents errors caused by all workers trying to use the same GPU. In a real-world application, each worker would be on a different machine. # import os # os.environ["CUDA_VISIBLE_DEVICES"] = "-1" import tensorflow as tf import os import logging import multiprocessing tf.get_logger().setLevel(logging.DEBUG) # ---- who do what cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver() # -- set GPU for worker def set_gpu(): gpus = tf.config.list_physical_devices('GPU') if gpus: # Restrict TensorFlow to only use the first GPU try: for device in gpus: tf.config.experimental.set_memory_growth(device, True) # tf.config.set_logical_device_configuration( # gpus[0], # [tf.config.LogicalDeviceConfiguration(memory_limit=3024)]) gpu_devices = tf.config.experimental.list_physical_devices('GPU') tf.config.set_visible_devices(gpus[0], 'GPU') logical_gpus = tf.config.list_logical_devices('GPU') print() print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPU") print() cpu_ph = tf.config.list_physical_devices('CPU') cpu_lg = tf.config.list_logical_devices('CPU') print(len(cpu_ph), "Physical CPUs,", len(cpu_lg), "Logical CPU") except RuntimeError as e: # Visible devices must be set before GPUs have been initialized print(e) # if cluster_resolver.task_type in ("worker", "ps"): set_gpu() # for all # -- wait for task for worker and ps if cluster_resolver.task_type in ("worker", "ps"): # Start a TensorFlow server and wait. # Set the environment variable to allow reporting worker and ps failure to the # coordinator. This is a workaround and won't be necessary in the future. os.environ["GRPC_FAIL_FAST"] = "use_caller" # # Workers need some inter_ops threads to work properly. worker_config = tf.compat.v1.ConfigProto(device_count={'GPU': 1, 'CPU':1}) if cluster_resolver.task_type in ("worker"): NUM_WORKERS=len(cluster_resolver.cluster_spec().job_tasks('worker')) if multiprocessing.cpu_count() < NUM_WORKERS + 1: worker_config.inter_op_parallelism_threads = NUM_WORKERS + 1 server = tf.distribute.Server( cluster_resolver.cluster_spec(), job_name=cluster_resolver.task_type, task_index=cluster_resolver.task_id, config=worker_config, protocol=cluster_resolver.rpc_layer or "grpc", start=True) print("cluster_resolver.task_type", cluster_resolver.task_type) print("cluster_resolver.task_id", cluster_resolver.task_id) print("cluster_resolver.rpc_layer", cluster_resolver.rpc_layer or "grpc") print("server.default_session_config", server.server_def.default_session_config) print() server.join() elif cluster_resolver.task_type == "evaluator": # Run sidecar evaluation pass # note used else: # Run the coordinator. # ---- ParameterServerStrategy object. will use all the available GPUs on each worker NUM_PS=len(cluster_resolver.cluster_spec().job_tasks('ps')) variable_partitioner = ( tf.distribute.experimental.partitioners.MinSizePartitioner( min_shard_bytes=(256 << 10), max_shards=NUM_PS)) strategy = tf.distribute.ParameterServerStrategy( cluster_resolver, variable_partitioner=variable_partitioner) # --------------------------------------------------------------------------------------------------- # ----------------------- Model, Dataset, Training -------------------------------------------------- with strategy.scope() from importlib import reload reload("./resnet-model-and-data.py") # ------------ Part require modification for ParameterServer strategy train_dataset = tf.data.Dataset.from_tensor_slices((x_train.astype(str), y_train.astype(int))).skip(df.shape[0] - df.shape[0]//4) train_dataset = train_dataset.map(lambda x, y: encode_single_sample(x, y), tf.data.experimental.AUTOTUNE) train_dataset = train_dataset.batch(BATCH_SIZE).prefetch(100) validation_dataset = tf.data.Dataset.from_tensor_slices((x_valid.astype(str), y_valid.astype(int))).skip(df.shape[0] - df.shape[0]//4) validation_dataset = validation_dataset.map(lambda x, y: encode_single_sample(x, y), tf.data.experimental.AUTOTUNE) validation_dataset = train_dataset.prefetch(100) # ---- train ---- model.fit(train_dataset, epochs=1) # -- checks the model's performance print("evaluate") model.evaluate(validation_dataset, verbose=2) # -- inferece print("inference", x_valid[0], y_valid[0]) im, l = encode_single_sample(x_valid[0], y_valid[0]) im = tf.expand_dims(im, axis=0) print("im", im.shape) predictions = model.predict(im, batch_size=1) print(np.argmax(predictions)) print("label:", y_valid[0])
- Variable sharding
for very large embeddings that may not fit in a single machine's memory
- TF_CONFIG
'TF_CONFIG' environment variable if you use TFConfigClusterResolver.
- logging steps
train_step = model.train_step def my_train_step(data): tf.print("step:", model._train_counter) return train_step(data) model.train_step = my_train_step
- troubleshooting
- after 1 epoch - TensorFlow device GPU:0 was not registered
auto_shard.cc: AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
- https://stackoverflow.com/questions/72740907/tensorflow-cant-apply-sharing-policy-file-when-using-mirrored-strategy
- dataset = # some dataset
- options = tf.data.Options()
- options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.FILE
- dataset = dataset.with_options(options) # use this as input for your model
Attempting to perform BLAS operation using StreamExecutor without BLAS support" error occurs
- tf.config.set_logical_device_configuration(gpus[0],[tf.config.LogicalDeviceConfiguration(memory_limit=1024)])
NOT_FOUND: TensorFlow device GPU:1 was not registered - several times after start
- all pods should have equal amount of GPU:
- in YAML: resources: limits: nvidia.com/gpu: 1
- tf.compat.v1.ConfigProto(device_count={'GPU': 1, 'CPU':1}) - for all pods
SessionOptions: device_count{key: "CPU", value:1,}, device_count{key: "GPU", value:0,}
- enable GPU at chief and PS
Successful NUMA node read from SysFS had negative value (-1)
- https://gist.github.com/zrruziev/b93e1292bf2ee39284f834ec7397ee9f
- apt install pciutils
- links
- https://www.tensorflow.org/tutorials/distribute/parameter_server_training
- article tfv1.0 https://support.huawei.com/enterprise/en/doc/EDOC1100164821/704ae7ed/distributed-training-based-on-the-ps-worker-architecture
- keras faq https://keras.io/getting_started/faq/#how-can-i-distribute-training-across-multiple-machines
- keras distrib https://keras.io/guides/distributed_training/
- TF input for distributed training https://www.tensorflow.org/tutorials/distribute/input
- ps example https://github.com/tensorflow/tensorflow/issues/57694
- explanation 2014 https://www.usenix.org/system/files/conference/osdi14/osdi14-paper-li_mu.pdf
- https://habr.com/ru/companies/wunderfund/articles/663104/
10.40.5. TF_CONFIG
'TF_CONFIG' environment variable is a JSON string
- what tasks constitute a cluster
- their addresses
- each task's role in the cluster
10.40.6. data sharding
- https://www.tensorflow.org/tutorials/distribute/keras
- https://www.tensorflow.org/tutorials/distribute/input
- https://www.tensorflow.org/guide/distributed_training
tf.data.experimental.AutoShardPolicy
- AUTO or FILE - tf.data.Dataset that reads from files.
Note: tf.data.experimental.AutoShardPolicy.FILE - the actual per-step batch size may be smaller than the one you defined for the global batch size - when the remaining elements in the file are less than the global batch size
10.40.7. links
10.40.8. monitor
- chargpt
- TensorFlow Extended (TFX): TFX provides a comprehensive end-to-end pipeline for building,
training, and deploying machine learning models. It includes components for monitoring the model training process and tracking model metrics during training.
- TensorBoard: TensorBoard is a web-based tool provided by TensorFlow that allows you to
visualize and monitor various aspects of your model training, such as loss, accuracy, and computational graphs. It can be integrated with Kubernetes to monitor the training process running on the cluster.
- Kubernetes Dashboard: The Kubernetes dashboard is a web-based user interface that provides a
visual representation of the cluster, including information about deployments, pods, jobs, and other resources. It can be used to monitor the status and progress of the neural network training on the Kubernetes cluster.
- Prometheus and Grafana: Prometheus is a popular open-source monitoring and alerting platform
that can be used to collect and store metrics from your TensorFlow cluster. Grafana is a visualization and analytics tool that can be integrated with Prometheus to create customizable dashboards for monitoring and analyzing training metrics.
- KubeFlow: KubeFlow is an open-source project that provides a platform for end-to-end machine
learning workflows on Kubernetes. It includes components for model training, hyperparameter tuning, model packaging, and serving. KubeFlow also provides monitoring capabilities to track the progress of your model training and performance metrics.
- TODO tensorboard
10.41. toy model MNIST
#+NAME https://www.tensorflow.org/tutorials/distribute/multi_worker_with_keras
import tensorflow as tf mnist = tf.keras.datasets.mnist (x_train, y_train), (x_test, y_test) = mnist.load_data() x_train, x_test = x_train / 255.0, x_test / 255.0 # -- dataset batch_size=16 train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)) train_dataset = train_dataset.shuffle(60000).repeat().batch(batch_size) validation_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test)) # -- model model = tf.keras.models.Sequential([ tf.keras.layers.Flatten(input_shape=(28, 28)), tf.keras.layers.Dense(128, activation='relu'), tf.keras.layers.Dropout(0.2), tf.keras.layers.Dense(10) ]) loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) model.compile(optimizer='adam', loss=loss_fn, metrics=['accuracy']) # -- train # model.fit(x_train, y_train, epochs=5) model.fit(train_dataset, epochs=5, steps_per_epoch=200) # -- checks the model's performance model.evaluate(x_test, y_test, verbose=2) # -- inferece predictions = model(x_train[:1]).numpy() import numpy as np print(np.argmax(predictions)) print(y_train[:1])
10.42. logging
https://stackoverflow.com/questions/40559667/how-to-redirect-tensorflow-logging-to-a-file
tf.keras.utils.enable_interactive_logging() When interactive logging is enabled, Keras displays logs via stdout. This provides the best experience when using Keras in an interactive environment such as a shell or a notebook.
tensor:
- tf.debugging
- tf.print
log:
- tf.get_logger() return logging.getLogger('tensorflow')
10.42.1. standard way
10.42.2. pipe
script allow get full output
script -c 'python -i <<< "print \"test\""'
freezing at tree: disable buffering:
- sed -u
- grep –line-buffered
- perl -ne 'use IO::Handle ; printf "%s %s", scalar time(), $_ ; STDOUT->autoflush(1)'
10.42.3. logging
import logging
log = logging.getLogger('tensorflow') log.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh = logging.FileHandler('tensorflow.log') fh.setLevel(logging.DEBUG) fh.setFormatter(formatter) log.addHandler(fh)
10.43. callbacks for model.fit
10.44. USE CASES
10.44.1. TF 2.0 convert mode h5 to weight and arch
from tensorflow import keras from tensorflow.keras.models import Model import os # use CPU os.environ['CUDA_VISIBLE_DEVICES'] = '-1' # parent_path = os.path.join(os.getcwd(), os.pardir) model_path = '/mnt/hit4/hit4user/PycharmProjects/cnn/text_or_not/saved_models/cnn_trained_model2020-09-10 09:26:34.553480.h5' print(model_path) model: Model = keras.models.load_model(model_path) import time name = 'cnn_trained_model2020-09-10 09:26:34.553480' os.mkdir(name) with open("./"+name+"/model_to_json.json", "w") as json_file: json_file.write(model.to_json(indent=4)) model.save_weights('./'+name+'/') print("ok") time.sleep(1)
10.44.2. imbalanced dataset
strategy:
- oversample min to half of max
- apply class_weight
- class_weight
for binary:
weight_for_0 = (1 / neg) * (total / 2.0) weight_for_1 = (1 / pos) * (total / 2.0) class_weight = {0: weight_for_0, 1: weight_for_1}
for n-classes:
n_samples / (n_classes * np.bincount(y))
- n_samples is the total number of instances
- n_classes is the number of classes
- np.bincount(y) is an array of the number of instances in each class
apply weights:
n_classes = sorted(set(y)) n_samples = len(xy) n_samples / (n_classes * np.bincount(y)) model.fit(class_weight=class_weight)
y = [0]*5 + [1]*2 + [2]*5 y = np.array(y) x = np.array(list(range(len(y)))) xy= np.vstack([x,y]).transpose() # print(xy) classes = sorted(set(y)) n_classes = len(classes) n_samples = len(xy) print(n_samples) print(n_classes) print(np.bincount(y)) import numpy as np y = np.array(y) weights = n_samples / (n_classes * np.bincount(y)) class_weight = {c:w for c,w in zip(classes, weights)} print(class_weight)
12 3 [5 2 5] {0: 0.8, 1: 2.0, 2: 0.8}
import numpy as np y = [0]*100 + [1]*10 + [2]*300 u = sorted(set(y)) n_classes=3 n_samples=len(y) w = n_samples / (n_classes * np.bincount(y)) class_weight = {x:y for x, y in zip(u, w)} print(np.bincount(y), "- np.bincount(y) first sort ASC") print("unique", u) print(class_weight)
[100 10 300] - np.bincount(y) first sort ASC unique [0, 1, 2] {0: 1.3666666666666667, 1: 13.666666666666666, 2: 0.45555555555555555}
- numpy choose, oversampling
- 1
import numpy as np y = [0]*100 + [1]*10 + [2]*300 u = sorted(set(y)) print(np.bincount(y), "- np.bincount(y) first sort ASC") # -- oversampling distrib = np.bincount(y) prob = 1/distrib[y].astype(float) prob /= prob.sum() print("distrib =", distrib, distrib[y]) print("a =", np.arange(len(y))) print("count after(size) =", np.count_nonzero(distrib)*distrib.max()) print("prob =", prob) sel = np.random.choice(np.arange(len(y)), size=np.count_nonzero(distrib)*distrib.max(), p=prob).astype(int) y = np.array(y) print(y[np.random.choice(np.arange(len(y)), size=np.count_nonzero(distrib)*distrib.max(), p=prob)]) print(np.bincount(y[sel]))
[100 10 300] - np.bincount(y) first sort ASC distrib = [100 10 300] [100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 10 10 10 10 10 10 10 10 10 10 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300 300] a = [ 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409] count after(size) = 900 prob = [0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.00333333 0.03333333 0.03333333 0.03333333 0.03333333 0.03333333 0.03333333 0.03333333 0.03333333 0.03333333 0.03333333 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111 0.00111111] [2 0 0 2 1 1 2 1 2 0 0 2 1 0 0 0 2 0 1 1 1 2 0 2 1 1 2 2 0 1 0 2 0 1 0 2 0 2 1 1 0 1 1 1 1 0 2 2 2 1 0 2 0 1 0 0 1 2 1 0 2 1 2 1 0 0 1 1 2 1 2 2 1 2 2 0 2 1 0 1 1 1 0 0 0 1 2 1 2 1 0 2 2 1 1 2 1 2 1 1 0 1 0 2 2 1 2 2 2 1 2 0 0 0 0 2 1 2 1 0 0 1 2 2 2 1 2 1 0 0 1 2 1 2 0 0 0 0 2 0 2 1 2 2 2 2 0 2 2 1 0 0 0 0 2 1 1 2 1 1 0 2 2 2 0 2 2 1 2 2 2 2 2 2 1 0 2 2 0 2 0 1 1 2 2 2 1 2 1 2 0 1 1 1 1 1 2 0 0 0 1 2 2 2 1 2 2 1 2 1 2 1 2 0 0 0 2 1 2 1 1 2 0 1 2 2 0 2 1 1 0 2 0 2 0 1 0 0 2 0 2 0 1 0 2 0 1 2 2 0 0 0 1 0 0 1 0 0 0 2 1 0 2 0 1 2 0 0 1 0 1 1 0 1 2 2 1 0 0 1 0 2 2 2 0 0 2 2 1 2 0 2 1 0 2 0 2 0 1 0 1 1 0 2 0 1 1 1 0 0 0 0 1 0 1 0 1 2 0 0 0 0 2 0 0 0 2 0 1 2 0 2 1 1 1 0 1 0 2 1 0 2 1 0 2 1 2 2 0 2 0 1 2 0 1 1 1 2 2 0 0 2 0 1 1 0 2 1 2 1 0 0 2 1 0 2 0 2 0 2 2 0 0 1 2 0 2 0 1 1 1 0 2 1 2 2 1 0 1 2 0 2 2 1 2 1 2 0 1 1 2 2 2 1 1 1 1 2 1 0 0 1 1 1 2 2 1 0 0 0 2 1 1 2 0 1 2 1 0 0 2 1 0 2 2 1 2 0 1 1 0 1 1 1 0 1 2 2 2 2 1 1 2 0 1 1 1 1 0 2 2 0 2 0 2 1 2 1 2 2 0 1 2 1 0 0 0 0 1 1 1 0 2 0 2 0 2 1 1 2 2 1 1 2 2 1 2 2 1 1 0 2 1 2 0 1 0 1 0 2 0 2 1 2 2 0 1 2 1 1 1 1 2 0 0 0 2 1 2 2 0 1 2 1 2 0 0 0 2 1 2 0 1 0 0 0 1 0 2 1 0 0 2 1 1 1 1 0 1 2 2 2 1 2 2 2 0 0 0 0 0 1 1 2 0 2 0 1 0 0 0 1 1 0 2 2 2 2 0 2 1 2 1 1 1 1 2 2 0 0 1 1 0 0 2 2 2 0 1 2 2 1 0 2 1 1 0 0 1 0 1 2 2 1 1 0 0 1 0 2 1 2 2 1 1 2 1 2 2 2 2 0 2 1 1 0 1 1 2 1 1 1 0 0 0 0 2 2 1 0 2 2 1 1 0 2 1 2 2 0 2 0 0 2 0 1 2 2 1 0 1 2 2 0 0 1 1 0 2 0 2 2 1 0 2 2 2 0 0 1 0 2 2 1 1 1 2 0 1 2 0 0 1 2 0 1 0 2 1 0 1 1 2 0 1 2 2 2 2 2 2 2 0 2 0 0 1 1 1 0 0 2 0 1 0 0 1 1 2 0 2 0 1 1 2 0 1 0 1 2 1 1 0 2 0 0 0 0 0 1 2 0 0 1 0 0 0 1 0 1 0 0 1 1 0 2 2 2 2 2 0 1 0 0 0 1 2 2 0 0 1 1 0 1 0 0 0 1 1 2 0 0 2 0 0 0 1 0 1 2 2 2 0 1 1 0 0 2 0 0 1 0 0 1 1 2 2 2 1 1 2 1 1 2 2 1 1 1 0 0 1 1 1 0 1 0 0 1 1 1 1 2 0 2 1 1 1 0 0 2 1 2 1 1 2 0 1 0 2 2 2 0 0 0 0 0 0 2 2 1] [328 284 288]
- 2
- simple 1d arrays
import numpy as np y = [0]*5 + [1]*2 + [2]*10 y = np.array(y) x = np.array(list(range(len(y)))) xy= np.vstack([x,y]).transpose() # --------------------- unq, unq_idx = np.unique(y, return_inverse=True) print("unq, unq_idx", unq, unq_idx) unq_cnt = np.bincount(unq_idx) print("unq_cnt", unq_cnt) min = np.min(unq_cnt) max = np.max(unq_cnt) print("max", max, "min", min) # cnt = round((max - min)/2 + min) cnt = max print("cnt", cnt) print("y.shape[1:]", y.shape[1:]) out = np.empty((cnt*len(unq) - len(y),), y.dtype) # # out = np.empty((cnt*len(unq) - len(xy),) + xy.shape[1:], xy.dtype) print("out.shape", out.shape, "xy.shape", xy.shape) slices = np.concatenate(([0], np.cumsum(cnt - unq_cnt))) print(slices) for j in range(len(unq)): indices = np.random.choice(np.where(unq_idx==j)[0], cnt - unq_cnt[j]) print("indices", indices) out[slices[j]:slices[j+1]] = y[indices] print("out", out) # out = np.hstack((y, out)) print(out) print(np.bincount(out), "- np.bincount(out) first sort ASC")
unq, unq_idx [0 1 2] [0 0 0 0 0 1 1 2 2 2 2 2 2 2 2 2 2] unq_cnt [ 5 2 10] max 10 min 2 cnt 10 y.shape[1:] () out.shape (13,) xy.shape (17, 2) [ 0 5 13 13] indices [0 4 0 4 4] out [ 0 0 0 0 0 140160696704256 140160713380912 140160696704416 140160696541680 94915202709280 0 172834964494878845 240] indices [6 6 5 6 5 5 5 6] out [0 0 0 0 0 1 1 1 1 1 1 1 1] indices [] out [0 0 0 0 0 1 1 1 1 1 1 1 1] [0 0 0 0 0 1 1 1 1 1 1 1 1] [5 8] - np.bincount(out) first sort ASC
- simple xy
import numpy as np y = [0]*5 + [1]*2 + [2]*10 y = np.array(y) x = np.array(list(range(len(y)))) xy= np.vstack([x,y]).transpose() # --------------------- unq, unq_idx = np.unique(y, return_inverse=True) print("unq, unq_idx", unq, unq_idx) unq_cnt = np.bincount(unq_idx) print("unq_cnt", unq_cnt) cnt = np.max(unq_cnt) print("cnt", cnt) print("y.shape[1:]", y.shape[1:]) # out = np.empty((cnt*len(unq) - len(y),), y.dtype) out = np.empty((cnt*len(unq) - len(xy),) + xy.shape[1:], xy.dtype) print("out.shape", out.shape, "xy.shape", xy.shape) slices = np.concatenate(([0], np.cumsum(cnt - unq_cnt))) print(slices) for j in range(len(unq)): indices = np.random.choice(np.where(unq_idx==j)[0], cnt - unq_cnt[j]) print("indices", indices) out[slices[j]:slices[j+1]] = xy[indices] print("out", out) # out = np.hstack((y, out)) print(out) # print(np.bincount(out), "- np.bincount(out) first sort ASC")
unq, unq_idx [0 1 2] [0 0 0 0 0 1 1 2 2 2 2 2 2 2 2 2 2] unq_cnt [ 5 2 10] cnt 10 y.shape[1:] () out.shape (13, 2) xy.shape (17, 2) [ 0 5 13 13] indices [2 3 3 2 4] out [[2 0] [3 0] [3 0] [2 0] [4 0] [0 0] [0 0] [0 0] [0 0] [0 0] [0 0] [0 0] [0 0]] indices [6 6 6 5 5 5 5 6] out [[2 0] [3 0] [3 0] [2 0] [4 0] [6 1] [6 1] [6 1] [5 1] [5 1] [5 1] [5 1] [6 1]] indices [] out [[2 0] [3 0] [3 0] [2 0] [4 0] [6 1] [6 1] [6 1] [5 1] [5 1] [5 1] [5 1] [6 1]] [[2 0] [3 0] [3 0] [2 0] [4 0] [6 1] [6 1] [6 1] [5 1] [5 1] [5 1] [5 1] [6 1]]
- full
import numpy as np # --------------------- def calc_oversampl(xy): unq, unq_idx = np.unique(xy[:, -1], return_inverse=True) unq_cnt = np.bincount(unq_idx) cnt = np.max(unq_cnt) out = np.empty((cnt*len(unq) - len(xy),) + xy.shape[1:], xy.dtype) slices = np.concatenate(([0], np.cumsum(cnt - unq_cnt))) for j in range(len(unq)): indices = np.random.choice(np.where(unq_idx==j)[0], cnt - unq_cnt[j]) out[slices[j]:slices[j+1]] = xy[indices] # print(out) return np.vstack((xy, v)) out = [0]*5 + [1]*2 + [2]*1 v = np.array(v) x = np.array(list(range(len(v)))) xy= np.vstack([x,v]).transpose() print(xy) print(np.bincount(xy[:,1])) out = calc_oversampl(xy) # print(out) print(np.bincount(out[:,1]))
[[0 0] [1 0] [2 0] [3 1] [4 0] [5 1] [6 2] [7 0] [8 0] [9 0]] [7 2 1] [7 7 7]
- half
import numpy as np def oversample(xy, maxc=None): unq, unq_idx = np.unique(xy[:, -1], return_inverse=True) unq_cnt = np.bincount(unq_idx) if maxc: cnt = maxc else: cnt = np.max(unq_cnt) out = np.empty((cnt*len(unq) - len(xy),) + xy.shape[1:], xy.dtype) slices = np.concatenate(([0], np.cumsum(cnt - unq_cnt))) for j in range(len(unq)): indices = np.random.choice(np.where(unq_idx==j)[0], cnt - unq_cnt[j]) out[slices[j]:slices[j+1]] = xy[indices] return np.vstack((xy, out)) def oversamples_half(xy): # - separate part of xy with classes which count of examples > max(count of examples)//2 unq, unq_idx = np.unique(xy[:, -1].astype(int), return_inverse=True) unq_cnt = np.bincount(unq_idx) cnt_half = np.max(unq_cnt) //2 use_u = unq[unq_cnt<cnt_half] use_i = np.vectorize(lambda x: x in use_u)(xy[:,-1]) use = xy[use_i] not_use = xy[~use_i] # print("use", np.bincount(use[:,1].astype(int))) out = oversample(use, maxc=cnt_half) # print("out", np.bincount(out[:,1].astype(int))) return np.vstack((out, not_use)) xy = np.array( [[0,0], [1,0], [2,0], [3,1], [4,0], [5,1], [6,3], [7,0], [8,0], [9,0]] ) # xy[:,1].astype(int) print(np.bincount(xy[:,1].astype(int))) out = calc_oversamples_half(xy) print(np.bincount(out[:,1].astype(int))) print(out) # print(np.bincount(out[:,1].astype(int)))
[7 2 0 1] use [0 2 0 1] out [0 3 0 3] [7 3 0 3] [[3 1] [5 1] [6 3] [3 1] [6 3] [6 3] [0 0] [1 0] [2 0] [4 0] [7 0] [8 0] [9 0]]
- simple 1d arrays
- 1
10.45. common errors:
ValueError: Input 0 of layer "model" is incompatible with the layer: expected shape=(None, 200, 60, 1), found shape=(None, 60, 1)
- print(type(input))
- input: class =
- tf.expand_dims(encsample["image"], axis=0)
tf.data.Dataset data = next(iterator) Cannot add tensor to the batch: number of elemets does not match. Shapes are: [tensor]: [4], [batch]: [5]
- solutions:
- .padded_patch
- .apply(tf.data.experimental.dense_to_ragged_batch(…))
11. PyTorch
data_science#MissingReference install: https://pytorch.org/get-started/locally/ examples https://github.com/pytorch/examples/
- GPU Tensors, Dynamic Neural Networks and deep Python integration
- This is closer to writing code in any language as a for loop in code will behave as a for loop inside the graph structure as well.
- TensorFlow doesn’t handle dynamic graphs very well though there are some not so flexible and frankly quite limiting primitive dynamic constructs.
- Intel MKL and NVIDIA (CuDNN, NCCL) support
- have their own official model repositories,
PyTorch:
- replacement for NumPy to use the power of GPUs
- deep learning research platform
HuggingFace: most models Pytorch
11.1. install
May 8, 2023
pip3 install torch==2.0.1 torchvision==0.15.2 torchaudio==2.0.2 --index-url https://download.pytorch.org/whl/cu118
11.2. history
- 2002 - Torch (picked up by Facebook AI Research). Lua + C. three key features:
- ease the development of numerical algorithms.
- easily extended
- fast
- 2017 PyTorch beta.
- Caffe2 was merged into PyTorch at the end of March 2018
- 1.13
- BetterTransformer supports fastpath execution for common Transformer models during Inference out-of-the-box, without the need to modify the model.
- Functorch now in PyTorch Core Library - composable vmap (vectorization) and autodiff transforms.
- PyTorch 2.0 has been released on 15 March 2023 (2-series)
- PyTorch 2.2 SDPA FlashAttention-2, TorchInductor, device_mesh, TORCH_LOGS.
11.2.1. PyTorch 2.0
- fundamentally changing and supercharging how PyTorch operates at compiler level under the hood.
- faster performance and support for Dynamic Shapes and Distributed.
- torch.compile - from C++ back into Python - additive (and optional) feature
- 2.0 is 100% backward compatible
TorchDynamo
AOTAutograd
PrimTorch
TorchInductor
Compilation steps:
- graph acquisition - TorchDynamo + AOTAutograd
- graph lowering - ATen/ Prim IR
- graph compilation - TorchInductor(default) powered by Triton. Features:
- your own backend
- nvFuser
- TVM
- XLA
- AITemplate
- TensorRT
11.2.2. FlashAttention-2 - approximate attention method
FlashAttention: Fast and Memory-Efficient Exact Attention with IO-Awareness
Transformers: time and memory complexity of self-attention are quadratic in sequence length.
FlashAttention trains Transformers faster than existing baselines: 15% end-to-end wall-clock speedup on BERT-large
11.3. deployment
- TorchServe
- endpoint specification, model archiving, and observing metrics
- provide REST and gRPC APIs
- still in its infancy
- PyTorch Live - build upon old PyTorch Mobile
- uses JavaScript and React Native to create cross-platform iOS and Android AI-powered apps
- focuses on mobile only
11.4. ecosystem
https://pytorch.org/ecosystem/
- PyTorch Hub https://pytorch.org/docs/stable/hub.html
- sharing repositories with pre-trained models
- PyTorch-XLA https://pytorch.org/xla/release/1.9/index.html
- train PyTorch models on Google's Cloud TPUs
- TorchVision - Computer Vision library https://github.com/pytorch/vision https://pytorch.org/vision
- example models TIMM (pyTorch IMage Models) https://github.com/rwightman/pytorch-image-models
- TorchText - Natural Language Processing https://pytorch.org/text/stable/index.html
- utilities and datasets
- Facebook AI Research Sequence-to-Sequence Toolkit https://github.com/pytorch/fairseq
- TorchAudio - ASR - https://pytorch.org/audio/stable/index.html and https://github.com/pytorch/audio
- includes popular audio models like DeepSpeech and Wav2Vec
- https://pytorch.org/audio/stable/tutorials/speech_recognition_pipeline_tutorial.html
- https://pytorch.org/audio/stable/pipelines.html
- SpeechBrain - speech toolkit for PyTorch
- ASR, speaker recognition, verification and diarization, and more!
- ESPnet - toolkit for end-to-end speech processing.
- speech recognition, translation, diarization,
- AllenNLP - open-source NLP research library
11.5. PyTorch 2.0
https://pytorch.org/get-started/pytorch-2.0
features:
- model compilation or compiled mode - wraps your model and returns a compiled model.
- will allow models to be ahead-of-time compiled for lightning-fast execution.
- compiles the forward function to a more optimized version.
- When compiling the model, we give a few knobs to adjust it.
- drop-in replacement for torch.jit.script()
- make distributed training simpler too
- TorchDynamo allow access model attributes like weight and modify them.
famous models:
- DALL-E 2
- Stable Diffusion
- ChatGPT.
torch.distributed
- DistributedDataParallel (DDP) - relies on overlapping AllReduce communications with backwards computation
- FullyShardedDataParallel (FSDP) - “beta”
11.6. device
import torch # Set the device device = "cuda" if torch.cuda.is_available() else "cpu" # Set the device globally torch.set_default_device(device) if device == "cuda": GPU_SCORE = torch.cuda.get_device_capability() # optimization - perform faster matrix multiplications if GPU_SCORE >= (8, 0): print(f"[INFO] Using GPU with score: {GPU_SCORE}, enabling TensorFloat32 (TF32) computing (faster on new GPUs)") torch.backends.cuda.matmul.allow_tf32 = True else: print(f"[INFO] Using GPU with score: {GPU_SCORE}, TensorFloat32 (TF32) not available, to use it you need a GPU with score >= (8, 0)") torch.backends.cuda.matmul.allow_tf32 = False
11.7. models - torchvision.models
import torchvision.models as models # from torchvision.models import resnet50 resnet = models.resnet50(weights=None) # random initialization
Torch Hub
import torch # Option 1: passing weights param as string model = torch.hub.load("pytorch/vision", "resnet50", weights="IMAGENET1K_V2")
11.8. nn.Module
- model.parameters() - the learnable parameters (i.e. weights and biases
- model.state_dict() is simply a Python dictionary object that maps each layer to its parameter tensor.
11.8.1. nn.Linear
y = x*(A^T) + b , idk why ^T
import numpy as np m = np.random.random((2,3)) # Linear(in_features=2, out_features=5) input = np.random.random((10,2)) print(np.matmul(input,m).shape)
(10, 3)
11.9. Dataset and DataLoader, transform
Dataset - retrieves our dataset’s features and labels one sample at a time.
- from torch.utils.data import Dataset (must be created)
- Dataset - map-style datasets, - __getitem__() and __len__(), accessible with dataset[idx]
- IterableDataset - iterable-style datasets. - __iter__() - when called iter(dataset), could return a stream
of data reading from a database, a remote server, or even logs generated in real time.
- multi-process data loading.
- DataLoader - minibatches, reshuffle the data at every epoch to reduce model overfitting, and use Python’s multiprocessing to speed up data retrieval.
- Dataset -> Sampler -> BatchSampler + Dataset -> Data batch
- from torch.utils.data import DataLoader (accept Dataset as constructor argument)
samplers is to determine how batches should be formed. they are passed to a PyTorch Dataloader
- When the dataloader is initialized, the sampler is also passed to it ( RandomSampler by default) which first create the sequence order in which the the samples in dataset is accessed using index.ie (1,2,3..N) where N = size of the dataset.
test Dataset:
img, lab = train_dataset.__getitem__(0)
test DataLoader:
img, lab = iter(train_loader).next()
Trnasform - part of Dataset implementation, applyed in __getitem__()
- from torchvision import transforms https://pytorch.org/vision/stable/transforms.html
sample = self.transform(sample) ; return sample
Approach 2):
- train_dataset = torchvision.datasets.ImageFolder(root='aa/train', transform=MyTransform)
11.9.1. code
import torch import torchvision.models as models from torch.utils.data import Dataset from torch.utils.data import DataLoader from torchvision.io import read_image from torchvision import transforms IMG_WIDTH = 64 IMG_HEIGHT = 64 # - image format default_float_dtype = torch.get_default_dtype() class LandmarkDataset(Dataset): def __init__(self, paths, labels, transform=None, target_transform=None): self.paths = paths self.labels = labels self.transform = transform self.target_transform = target_transform def __len__(self): return len(self.labels) def __getitem__(self, idx): image = read_image(self.paths[idx]) image = image.to(dtype=default_float_dtype).div(255) label = self.labels[idx] if self.transform: image = self.transform(image) if self.target_transform: label = self.target_transform(label) return image, label def main(): x_train, y_train = get_dataset() data_transform = transforms.Compose([ transforms.RandomResizedCrop((IMG_HEIGHT, IMG_WIDTH)), # transforms.ToTensor() # to [0.0, 1.0] ]) train_dataset: Dataset = LandmarkDataset(x_train, y_train, transform=data_transform) train_loader: DataLoader = DataLoader(train_dataset) # img, lab = train_dataset.__getitem__(0) img, lab = next(iter(train_loader)) print(img, lab)
11.10. Built-in datasets
all datasets return PIL Image: Image.fromarray(img.numpy(), mode="L")
- from PIL import Image
training.pt We no longer cache the data in a custom binary, but simply read from the raw data directly.
11.11. train
from datetime import datetime import torch import torchvision.models as models from torch.utils.data import Dataset from torch.utils.data import DataLoader from torchvision.io import read_image from torchvision import transforms class LandmarkDataset(Dataset): def __init__(self, paths, labels, transform=None, target_transform=None): self.paths = paths self.labels = labels self.transform = transform self.target_transform = target_transform def __len__(self): return len(self.labels) def __getitem__(self, idx): image = read_image(self.paths[idx]) image = image.to(dtype=default_float_dtype).div(255) label = self.labels[idx] if self.transform: image = self.transform(image) if self.target_transform: label = self.target_transform(label) # return image, label return image.to(device), torch.tensor(label, dtype=torch.long).to(device) def train_one_epoch(epoch_index, training_loader, optimizer, model, loss_fn, tb_writer=None): """ training_loader is (inputs, labels) """ running_loss = 0. last_loss = 0. avg_loss = 0. for i, data in enumerate(training_loader): inputs, labels = data optimizer.zero_grad() outputs = model(inputs) loss = loss_fn(outputs, labels) loss.backward() optimizer.step() running_loss += loss.item() if i % 10 == 9: avg_loss = running_loss / (1 if i // 10 == 0 else i // 10) print(' batch {} loss: {}'.format(i + 1, round(avg_loss,2))) # tb_x = epoch_index * len(training_loader) + i + 1 # tb_writer.add_scalar('Loss/train', last_loss, tb_x) # running_loss = 0. return avg_loss def train(model, training_loader, validation_loader, loss_fn, wirter=None): # require import datetime timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') # writer = SummaryWriter('runs/fashion_trainer_{}'.format(timestamp)) epoch_number = 0 EPOCHS = 2 best_vloss = 1_000_000. for epoch in range(EPOCHS): print('EPOCH {}:'.format(epoch_number + 1)) # ---- train ---- model.train(True) avg_loss = train_one_epoch(epoch_number, training_loader=training_loader, # optimizer=torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9), optimizer=torch.optim.Adam(model.parameters()), model=model, loss_fn=loss_fn, tb_writer=None) running_vloss = 0.0 # ---- validate ---- model.eval() # - Disable gradient computation and reduce memory consumption. with torch.no_grad(): for i, vdata in enumerate(validation_loader): vinputs, vlabels = vdata voutputs = model(vinputs) vloss = loss_fn(voutputs, vlabels) running_vloss += vloss avg_vloss = running_vloss / (i + 1) print('LOSS train {} valid {}'.format(avg_loss, avg_vloss)) # writer.add_scalars('Training vs. Validation Loss', # { 'Training' : avg_loss, 'Validation' : avg_vloss }, # epoch_number + 1) # writer.flush() if avg_vloss < best_vloss: best_vloss = avg_vloss model_path = 'model_{}_{}'.format(timestamp, epoch_number) torch.save(model.state_dict(), model_path) # save the model's state epoch_number += 1 def create_model(classes) -> torch.nn.Module: resnet = models.resnet50(weights=None) num_ftrs = resnet.fc.in_features resnet.fc = torch.nn.Linear(num_ftrs, out_features=classes) return resnet def main(): x_train, x_valid, y_train, y_valid, OUTPUT_SIZE = get_dataset() data_transform = transforms.Compose([ transforms.RandomResizedCrop((IMG_HEIGHT, IMG_WIDTH)), # transforms.ToTensor() # to [0.0, 1.0] ]) train_dataset: Dataset = LandmarkDataset(x_train, y_train, transform=data_transform) from torch.utils.data.dataloader import default_collate generator = torch.Generator(device=device) train_loader: DataLoader = DataLoader(train_dataset, shuffle=True, batch_size=BATCH_SIZE, generator=generator) # , pin_memory_device=device, pin_memory=True # collate_fn=lambda x: (default_collate(x[0]).to(device), default_collate(torch.from_numpy(x[1])).to(device)) valid_dataset: Dataset = LandmarkDataset(x_valid, y_valid, transform=data_transform) valid_loader: DataLoader = DataLoader(valid_dataset) # img, lab = train_dataset.__getitem__(0) # img, lab = next(iter(train_loader)) # print(img, lab) # -- train model: torch.nn.Module = create_model(OUTPUT_SIZE) # load model definition print(model) train(model, training_loader=train_loader, validation_loader=valid_loader, loss_fn=torch.nn.CrossEntropyLoss()) # -- save, load and inference import os PATH = os.path.join(os.getcwd(), 'savedmodel') torch.save(model.state_dict(), PATH)
11.12. train (old)
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward(retain_graph=True)
optimizer.step()
When we call loss.backward() - all Tensors in the graph that has requires_grad=True will have their .grad Tensor accumulated with the gradient.
11.13. loss, inference, accuracy
import torch loss = torch.nn.CrossEntropyLoss() input = torch.randn(3, 5, requires_grad=True) target = torch.empty(3, dtype=torch.long).random_(5) output = loss(input, target) output.backward() print(output) # after save: model = create_model(OUTPUT_SIZE) model.load_state_dict(torch.load(PATH)) model.eval() # -- inference img, lab = next(iter(DataLoader(valid_dataset, shuffle=True, batch_size=1 ,generator=generator ))) # get random item print("lab", lab) result: torch.Tensor = model(img) import numpy as np print("result", np.argmax(result.cpu().detach().numpy()))
Accuracy:
import torch target = torch.tensor([0, 1, 1]) preds = torch.tensor([[0.1, 0.9, 0], [0.3, 0.1, 0.6], [0.2, 0.5, 0.3]]) accuracy = torch.metrics.Accuracy(task="multiclass", num_classes=3, top_k=2) print(accuracy(preds, target))
11.14. numpy
import torch x = torch.empty(5, 3) print(x) print(x.size()) >> torch.Size([5, 3]) # Converting a Torch Tensor to a NumPy Array n = torch.ones(5).numpy() # Converting NumPy Array to Torch Tensor t = torch.from_numpy(a) # tensors on CUDA if torch.cuda.is_available(): device = torch.device("cuda") # a CUDA device object y = torch.ones_like(x, device=device) x = x.to(device) z = x + y print(z.to("cpu", torch.double)) # back to cpu # random x = torch.randn(4, 4) # from a normal distribution - mean 0 and variance 1 x = torch.rand(4, 4) # on the interval [0,1) # resize/reshape y = x.view(16) # line z = x.view(-1, 8) # column: torch.Size([2, 8]) # torch.squeeze(input, dim=None, out=None) → Tensor # tensor(A×1×B×C×1×D) >>> x = torch.zeros(2, 1, 2, 1, 2) torch.Size([2, 1, 2, 1, 2]) # выжимать remove 1 size dimensions >>> y = torch.squeeze(x) # torch.Size([2, 2, 2]) >>> y = torch.squeeze(x, 0) torch.Size([2, 1, 2, 1, 2]) >>> y = torch.squeeze(x, 1) torch.Size([2, 2, 1, 2]) # Concatenates sequence of tensors along a new dimension: torch.stack(tensors: list, dim=0, out=None) → Tensor # transpose t = torch.tensor([[1,2,3],[4,5,6]]) torch.transpose(t,0,1) >tensor([[1, 4], [2, 5], [3, 6]]) # add dimension >> torch.Size([1, 2]) a.unsqueeze(0).size() >> torch.Size([1, 1, 2]) a.unsqueeze(-1).size() >> torch.Size([1, 2, 1])
11.15. layers
import torch.nn as nn import torch.nn.functional as F # activation
- CNN
- nn.Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1)) -
11.16. noise
r = (0.1**0.9)*torch.randn(self.levels, batch, self.hidden_size//2, dtype=dtype, device=self.device) self.hidden1 = (self.hidden1[0] + r, self.hidden1[1] + r)
11.17. basic nn and gradient
input 32x32
torch.Size([64, 32, 26, 26]) - batch_size, output_channels, Height, Width
Trainable parameters:
params = sum(p.numel() for p in model.parameters() if p.requires_grad) print(f"Trainable parameters: {params:,}")
Recap:
- torch.Tensor - A multi-dimensional array with support for autograd operations like backward(). Also holds
the gradient w.r.t. the tensor.
- IF .requires_grad as True - it starts to track all operations on it. accumulated into .grad
- with torch.no_grad(): - for testing
- nn.Module - Neural network module. Convenient way of encapsulating parameters, with helpers for moving them to GPU, exporting, loading, etc.
- nn.Parameter - A kind of Tensor, that is automatically registered as a parameter when assigned as an attribute to a Module.
- autograd.Function - Implements forward and backward definitions of an autograd operation. Every Tensor operation creates at least a single Function node that connects to functions that created a Tensor and encodes its history.
11.17.1. first
import torch import torch.nn as nn # layer import torch.nn.functional as F # activation class Net(nn.Module): def __init__(self): super(Net, self).__init__() # 1 input image channel, 6 output channels, 3x3 square convolution # kernel self.conv1 = nn.Conv2d(1, 6, 3) # input 1 image to 6, 3x3 kernel, stride=1 default self.conv2 = nn.Conv2d(6, 16, 3) self.dropout1 = nn.Dropout2d(0.25) # an affine operation: y = Wx + b self.fc1 = nn.Linear(16 * 6 * 6, 120) # 6*6 from image dimension self.fc2 = nn.Linear(120, 84) self.fc3 = nn.Linear(84, 10) def forward(self, x): # Max pooling over a (2, 2) window x = F.max_pool2d(F.relu(self.conv1(x)), (2, 2)) # If the size is a square you can only specify a single number x = F.max_pool2d(F.relu(self.conv2(x)), 2) x = x.view(-1, self.num_flat_features(x)) x = F.relu(self.fc1(x)) x = self.dropout1(x) x = F.relu(self.fc2(x)) x = self.fc3(x) return x def num_flat_features(self, x): size = x.size()[1:] # all dimensions except the batch dimension num_features = 1 for s in size: num_features *= s return num_features net = Net() print(net) # print all layers params = list(net.parameters()) # learnable parameters of a model import torch.optim as optim # create your optimizer optimizer = optim.SGD(net.parameters(), lr=0.01) # in your training loop: optimizer.zero_grad() # zero the gradient buffers output = net(input) loss = criterion(output, target) loss.backward() optimizer.step() # Does the updatee
11.17.2. second
import argparse import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torchvision import datasets, transforms from torch.optim.lr_scheduler import StepLR class Net(nn.Module): def __init__(self): super(Net, self).__init__() self.conv1 = nn.Conv2d(1, 32, 3, 1) self.conv2 = nn.Conv2d(32, 64, 3, 1) self.dropout1 = nn.Dropout2d(0.25) self.dropout2 = nn.Dropout2d(0.5) self.fc1 = nn.Linear(9216, 128) self.fc2 = nn.Linear(128, 10) def forward(self, x): x = self.conv1(x) x = F.relu(x) x = self.conv2(x) x = F.max_pool2d(x, 2) x = self.dropout1(x) x = torch.flatten(x, 1) x = self.fc1(x) x = F.relu(x) x = self.dropout2(x) x = self.fc2(x) output = F.log_softmax(x, dim=1) return output def train(args, model: nn.Module, device, train_loader, optimizer, epoch): model.train() for batch_idx, (data, target) in enumerate(train_loader): data, target = data.to(device), target.to(device) optimizer.zero_grad() output = model(data) loss = F.nll_loss(output, target) loss.backward() optimizer.step() if batch_idx % args.log_interval == 0: print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format( epoch, batch_idx * len(data), len(train_loader.dataset), 100. * batch_idx / len(train_loader), loss.item())) def test(args, model: nn.Module, device, test_loader): model.eval() test_loss = 0 correct = 0 with torch.no_grad(): for data, target in test_loader: data, target = data.to(device), target.to(device) output = model(data) test_loss += F.nll_loss(output, target, reduction='sum').item() # sum up batch loss pred = output.argmax(dim=1, keepdim=True) # get the index of the max log-probability correct += pred.eq(target.view_as(pred)).sum().item() test_loss /= len(test_loader.dataset) print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format( test_loss, correct, len(test_loader.dataset), 100. * correct / len(test_loader.dataset))) def main(): # Training settings parser = argparse.ArgumentParser(description='PyTorch MNIST Example') parser.add_argument('--batch-size', type=int, default=64, metavar='N', help='input batch size for training (default: 64)') parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N', help='input batch size for testing (default: 1000)') parser.add_argument('--epochs', type=int, default=14, metavar='N', help='number of epochs to train (default: 14)') parser.add_argument('--lr', type=float, default=1.0, metavar='LR', help='learning rate (default: 1.0)') parser.add_argument('--gamma', type=float, default=0.7, metavar='M', help='Learning rate step gamma (default: 0.7)') parser.add_argument('--no-cuda', action='store_true', default=False, help='disables CUDA training') parser.add_argument('--seed', type=int, default=1, metavar='S', help='random seed (default: 1)') parser.add_argument('--log-interval', type=int, default=10, metavar='N', help='how many batches to wait before logging training status') parser.add_argument('--save-model', action='store_true', default=False, help='For Saving the current Model') args = parser.parse_args() use_cuda = not args.no_cuda and torch.cuda.is_available() # random seed torch.manual_seed(args.seed) device = torch.device("cuda" if use_cuda else "cpu") kwargs = {'num_workers': 1, 'pin_memory': True} if use_cuda else {} train_loader = torch.utils.data.DataLoader( datasets.MNIST('../data', train=True, download=True, transform=transforms.Compose([ transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,)) ])), batch_size=args.batch_size, shuffle=True, **kwargs) test_loader = torch.utils.data.DataLoader( datasets.MNIST('../data', train=False, transform=transforms.Compose([ transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,)) ])), batch_size=args.test_batch_size, shuffle=True, **kwargs) # load model to GPU model: nn.Module = Net() # print(model.shape) # print(model.parameters()) # params = list(model.) # print('params', params) params = sum(p.numel() for p in model.parameters() if p.requires_grad) print(f"Trainable parameters: {params:,}") model = Net().to(device) # optimizer optimizer = optim.Adadelta(model.parameters(), lr=args.lr) scheduler = StepLR(optimizer, step_size=1, gamma=args.gamma) for epoch in range(1, args.epochs + 1): h test(args, model, device, test_loader) scheduler.step() if args.save_model: torch.save(model.state_dict(), "mnist_cnn.pt") if __name__ == '__main__': main()
11.18. LSTM
- tutor https://pytorch.org/tutorials/beginner/nlp/sequence_models_tutorial.html
- doc https://pytorch.org/docs/stable/nn.html#recurrent-layers
- from Stratch https://mlexplained.com/2019/02/15/building-an-lstm-from-scratch-in-pytorch-lstms-in-depth-part-1/
- article https://towardsdatascience.com/lstm-for-time-series-prediction-de8aeb26f2ca
- article https://stackabuse.com/time-series-prediction-using-lstm-with-pytorch-in-python/
- github chinese https://github.com/TankZhouFirst/Pytorch-LSTM-Stock-Price-Predict/blob/master/LSTM%E5%AE%9E%E7%8E%B0%E8%82%A1%E7%A5%A8%E9%A2%84%E6%B5%8B--pytorch%20%E7%89%88%E6%9C%AC-V2.0.ipynb
11.18.1. nn.LSTM
expects all of its inputs to be 3D tensors:
- sequence itself
- indexes instances in the mini-batch
- indexes elements of the input
rnn = nn.LSTM(input_size=10, hidden_size=20, num_layers=2) input = torch.randn(5, 3, 10) h0 = torch.randn(2, 3, 20) # layers, batch size, hidden c0 = torch.randn(2, 3, 20) output, (hn, cn) = rnn(input, (h0, c0))
If the following conditions are satisfied, persistent algorithm can be selected to improve performance:
- cudnn is enabled
- input data is on the GPU
- input data has dtype torch.float16
- V100 GPU is used,
- input data is not in PackedSequence format
11.18.2. nn.LSTMCell
rnn = nn.LSTMCell(input_size=10, hidden_size=20) input = torch.randn(6, 3, 10) # 3=batch size hx = torch.randn(3, 20) # batch_size, hidden_size cx = torch.randn(3, 20) output = [] for i in range(6): hx, cx = rnn(input[i], (hx, cx)) output.append(hx)
11.18.3. numbers of parameters
gate_size = 4 * hidden_size # = 4 w_ih = Parameter(torch.Tensor(gate_size, layer_input_size)) w_hh = Parameter(torch.Tensor(gate_size, hidden_size)) b_ih = Parameter(torch.Tensor(gate_size))
b_hh = Parameter(torch.Tensor(gate_size)) layer_params = (w_ih, w_hh, b_ih, b_hh) # one lstm
4*4 = 16 parameters
4*(4*is + 4*hs + 4 + 4) # for first layer
11.18.4. basic
import torch import torch.nn as nn # num_layers = 1, bias=True, bidirectional=False lstm = nn.LSTM(input_size=1, hidden_size=1) inputs = [torch.randn(1, 1) for _ in range(5)] # make a sequence of length 5 # initialize the hidden state. hidden = (torch.randn(1, 1, 1), torch.randn(1, 1, 1)) for i in inputs: # Step through the sequence one element at a time. # after each step, hidden contains the hidden state. out, hidden = lstm(i.view(1, 1, -1), hidden) # alternatively, we can do the entire sequence all at once. # the first value returned by LSTM is all of the hidden states throughout # the sequence. the second is just the most recent hidden state # (compare the last slice of "out" with "hidden" below, they are the same) # The reason for this is that: # "out" will give you access to all hidden states in the sequence # "hidden" will allow you to continue the sequence and backpropagate, # by passing it as an argument to the lstm at a later time # Add the extra 2nd dimension inputs = torch.cat(inputs).view(len(inputs), 1, -1) hidden = (torch.randn(1, 1, 1), torch.randn(1, 1, 1)) # clean out hidden state out, (hn, cn) = lstm(inputs, hidden) params = sum(p.numel() for p in lstm.parameters()) print(list(lstm.parameters())) print(f"Trainable parameters: {params:,}") print(out) print(hn) print(cn)
11.18.5. tagging model
class LSTMTagger(nn.Module): def __init__(self, embedding_dim, hidden_dim, vocab_size, tagset_size): super(LSTMTagger, self).__init__() self.hidden_dim = hidden_dim self.word_embeddings = nn.Embedding(vocab_size, embedding_dim) # The LSTM takes word embeddings as inputs, and outputs hidden states # with dimensionality hidden_dim. self.lstm = nn.LSTM(embedding_dim, hidden_dim) # The linear layer that maps from hidden state space to tag space self.hidden2tag = nn.Linear(hidden_dim, tagset_size) def forward(self, sentence): embeds = self.word_embeddings(sentence) lstm_out, _ = self.lstm(embeds.view(len(sentence), 1, -1)) tag_space = self.hidden2tag(lstm_out.view(len(sentence), -1)) tag_scores = F.log_softmax(tag_space, dim=1) return tag_scores model = LSTMTagger(EMBEDDING_DIM, HIDDEN_DIM, len(word_to_ix), len(tag_to_ix)) loss_function = nn.NLLLoss() optimizer = optim.SGD(model.parameters(), lr=0.1) # See what the scores are before training # Note that element i,j of the output is the score for tag j for word i. # Here we don't need to train, so the code is wrapped in torch.no_grad() with torch.no_grad(): inputs = prepare_sequence(training_data[0][0], word_to_ix) tag_scores = model(inputs) print(tag_scores) for epoch in range(300): # again, normally you would NOT do 300 epochs, it is toy data for sentence, tags in training_data: # Step 1. Remember that Pytorch accumulates gradients. # We need to clear them out before each instance model.zero_grad() # Step 2. Get our inputs ready for the network, that is, turn them into # Tensors of word indices. sentence_in = prepare_sequence(sentence, word_to_ix) targets = prepare_sequence(tags, tag_to_ix) # Step 3. Run our forward pass. tag_scores = model(sentence_in) # Step 4. Compute the loss, gradients, and update the parameters by # calling optimizer.step() loss = loss_function(tag_scores, targets) loss.backward() optimizer.step() # See what the scores are after training with torch.no_grad(): inputs = prepare_sequence(training_data[0][0], word_to_ix) tag_scores = model(inputs) # The sentence is "the dog ate the apple". i,j corresponds to score for tag j # for word i. The predicted tag is the maximum scoring tag. # Here, we can see the predicted sequence below is 0 1 2 0 1 # since 0 is index of the maximum value of row 1, # 1 is the index of maximum value of row 2, etc. # Which is DET NOUN VERB DET NOUN, the correct sequence! print(tag_scores)
11.18.6. variable-sized mini-batches
11.18.7. GPU CUDA
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") if torch.cuda.is_available(): input = input.cuda() # GPU target = target.cuda() # GPU test_input = test_input.cuda() test_target = test_target.cuda()
seq: Model = Sequence() seq.double() seq = seq.to(device) # GPU
self.hidden = (torch.rand(self.levels, input.size(0), 51, dtype=torch.double), # layers, batch, hidden torch.rand(self.levels, input.size(0), 51, dtype=torch.double)) if torch.cuda.is_available(): self.hidden = (self.hidden[0].cuda(), self.hidden[1].cuda())
11.18.8. SGD
optim = torch.optim.SGD(model.parameters(), lr=0.01) lr = 0.5 * 1.2 optimizer = torch.optim.SGD(seq.parameters(), lr=lr, momentum=0.2) for s in range(STEPS): lr = lr / 1.2 print("lr", lr)
for g in optimizer.param_groups: g['lr'] = lr
11.19. Distributed - torch.distributed
11.19.1. overview
- DistributedDataParallel (DDP)
- torch.nn.parallel.DistributedDataParallel
FullyShardedDataParallel (FSDP) - “beta” higher level of complexity
- indicate which submodules of their model to wrap together in an FSDP instance used for state sharding, or
manually wrap submodules in FSDP instances
- If FSDP is used without wrapping submodules in separate instances, it falls back to operating similarly to
DDP, but without bucketing
- torch.distributed.fsdp
Two approaches to run:
- torch.distributed.launch
- torchrun (elastic)
model is wrapped with DistributedDataParallel:
- add hooks in forward() and backward() - for communicating
torch.distributed.launch
11.19.2. huggingface
torch_xla and torch.distributed
11.19.3. torch.distributed.rpc
11.19.4. FSDP
https://pytorch.org/tutorials/intermediate/FSDP_tutorial.html https://github.com/pytorch/examples/blob/main/distributed/FSDP/T5_training.py
FSDP units - parts of model that will be sharded
- performance optimizations
- Mixed Precision - with BFloat16 resulted in ~5x improvement versus FP32
- Activation Checkpointing (AC) - reinvesting the freed memory from the checkpoints into larger batch size
- Transformer Wrapping Policy vs default wrapping policy. 20-25% slower! free 33-38% GPU memory! Freed up memory can be used to increase batch size for speed.
Full Shard Strategy versus zero2 (DDP) resulted in 1.5x improvement.
transformer wrapping policy and activation checkpointing - required for 3 nodes - T5 11B model
sharding_strategy -
- FULL_SHARD - default -
- SHARD_GRAD_OP - Zero2 mode - model parameters are not freed after forward pass, reducing communication needs
- NO_SHARD - DDP mode , just copy of model, only grad synch needed
- ex tutorial
import torch.distributed as dist world_size = 2 rank = 0 # per worker 0 ... ? fsdp_main(rank, world_size, batch_size, test_batch_size def fsdp_main(rank, world_size, args): setup(rank, world_size) transform=transforms.Compose([ transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,)) ]) dataset1 = datasets.MNIST('../data', train=True, download=True, transform=transform) dataset2 = datasets.MNIST('../data', train=False, transform=transform) sampler1 = DistributedSampler(dataset1, rank=rank, num_replicas=world_size, shuffle=True) sampler2 = DistributedSampler(dataset2, rank=rank, num_replicas=world_size) train_kwargs = {'batch_size': args.batch_size, 'sampler': sampler1} test_kwargs = {'batch_size': args.test_batch_size, 'sampler': sampler2} cuda_kwargs = {'num_workers': 2, 'pin_memory': True, 'shuffle': False} train_kwargs.update(cuda_kwargs) test_kwargs.update(cuda_kwargs) train_loader = torch.utils.data.DataLoader(dataset1,**train_kwargs) test_loader = torch.utils.data.DataLoader(dataset2, **test_kwargs) my_auto_wrap_policy = functools.partial( size_based_auto_wrap_policy, min_num_params=100 ) torch.cuda.set_device(rank) init_start_event = torch.cuda.Event(enable_timing=True) init_end_event = torch.cuda.Event(enable_timing=True) model = Net().to(rank) model = FSDP(model, fsdp_auto_wrap_policy=my_auto_wrap_policy, cpu_offload=CPUOffload(offload_params=True)) optimizer = optim.Adadelta(model.parameters(), lr=args.lr) scheduler = StepLR(optimizer, step_size=1, gamma=args.gamma) init_start_event.record() for epoch in range(1, args.epochs + 1): train(args, model, rank, world_size, train_loader, optimizer, epoch, sampler=sampler1) test(model, rank, world_size, test_loader) scheduler.step() init_end_event.record() if rank == 0: print(f"CUDA event elapsed time: {init_start_event.elapsed_time(init_end_event) / 1000}sec") print(f"{model}") if args.save_model: # use a barrier to make sure training is done on all ranks dist.barrier() # state_dict for FSDP model is only available on Nightlies for now states = model.state_dict() if rank == 0: torch.save(states, "mnist_cnn.pt") cleanup()
- ex t5
from torch.distributed.fsdp import ( FullyShardedDataParallel as FSDP, CPUOffload, MixedPrecision, BackwardPrefetch, ShardingStrategy, FullStateDictConfig, StateDictType, ) from torch.utils.data.distributed import DistributedSampler class train_config: model_name: str="t5-base" run_validation: bool=True batch_size_training: int=4 num_workers_dataloader: int=2 lr: float=0.002 weight_decay: float=0.0 gamma: float= 0.85 use_fp16: bool=False mixed_precision: bool=True save_model: bool=False class fsdp_config: mixed_precision: bool=True use_fp16: bool=False seed: int=42 fsdp_activation_checkpointing: bool=True limit_all_gathers: bool=True sharding_strategy: ShardingStrategy = ShardingStrategy.FULL_SHARD #HYBRID_SHARD, SHARD_GRAD_OP checkpoint_type: StateDictType = StateDictType.FULL_STATE_DICT # alternatively can use SHARDED_STATE_DICT to avoid OOMs save_optimizer: bool=False from torch.distributed.fsdp import ( # FullyShardedDataParallel as FSDP, # CPUOffload, MixedPrecision, # BackwardPrefetch, # ShardingStrategy, ) # requires grad scaler in main loop fpSixteen = MixedPrecision( param_dtype=torch.float16, # Gradient communication precision. reduce_dtype=torch.float16, # Buffer precision. buffer_dtype=torch.float16, ) bfSixteen = MixedPrecision( param_dtype=torch.bfloat16, # Gradient communication precision. reduce_dtype=torch.bfloat16, # Buffer precision. buffer_dtype=torch.bfloat16, ) bfSixteen_working = MixedPrecision( param_dtype=torch.float32, reduce_dtype=torch.bfloat16, buffer_dtype=torch.bfloat16, ) fp32_policy = MixedPrecision( param_dtype=torch.float32, reduce_dtype=torch.float32, buffer_dtype=torch.float32, ) def get_policies(cfg, rank): """establish current policies for mixed precision and fsdp wrapping""" mixed_precision_policy = None wrapping_policy = None # mixed precision ----- if cfg.mixed_precision: bfloat_available = bfloat_support() if bfloat_available and not cfg.use_fp16: mixed_precision_policy = policies.bfSixteen if rank == 0: print(f"bFloat16 enabled for mixed precision - using bfSixteen policy") elif cfg.use_fp16: mixed_precision_policy = policies.fpSixteen if rank == 0: print(f"FP16 enabled. ") else: # mixed_precision_policy = policies.fpSixteen print( f"bFloat16 support not present. Will use FP32, and not mixed precision" ) wrapping_policy = policies.get_t5_wrapper() return mixed_precision_policy, wrapping_policy def setup(): # initialize the process group dist.init_process_group("nccl") def cleanup(): dist.destroy_process_group() local_rank = int(os.environ['LOCAL_RANK']) rank = int(os.environ['RANK']) world_size = int(os.environ['WORLD_SIZE']) run_validation = True track_memory = True epochs = 1 batch_size = 1 test_batch_size = 1 sampler1 = DistributedSampler(train_dataset, rank=rank, num_replicas=world_size, shuffle=True) sampler2 = DistributedSampler(val_dataset, rank=rank, num_replicas=world_size) setup() train_kwargs = {'batch_size': batch_size, 'sampler': sampler1} test_kwargs = {'batch_size': test_batch_size, 'sampler': sampler2} cuda_kwargs = {'num_workers': 2, 'pin_memory': True, 'shuffle': False} train_kwargs.update(cuda_kwargs) test_kwargs.update(cuda_kwargs) train_loader = torch.utils.data.DataLoader(train_dataset,**train_kwargs) val_loader = torch.utils.data.DataLoader(val_dataset, **test_kwargs) torch.cuda.set_device(local_rank) mixed_precision_policy, t5_auto_wrap_policy = get_policies(train_config, rank) # Apply FSDP wrapping to the model model = FSDP(model, auto_wrap_policy=t5_auto_wrap_policy, mixed_precision=mixed_precision_policy, sharding_strategy=fsdp_config.sharding_strategy, device_id=torch.cuda.current_device(), limit_all_gathers=fsdp_config.limit_all_gathers) # if fsdp_config.fsdp_activation_checkpointing: # policies.apply_fsdp_checkpointing(model) # Set up optimizer and scheduler optimizer = optim.AdamW(model.parameters(), lr=train_config.lr) scheduler = StepLR(optimizer, step_size=1, gamma=train_config.gamma) best_val_loss = float("inf") curr_val_loss = float("inf") file_save_name = "T5-model-" if rank == 0: time_of_run = get_date_of_run() dur = [] train_acc_tracking = [] val_acc_tracking = [] training_start_time = time.time() if rank == 0 and track_memory: mem_alloc_tracker = [] mem_reserved_tracker = [] for epoch in range(1, epochs + 1): t0 = time.time() train_accuracy = train(model, rank, world_size, train_loader, optimizer, epoch, sampler=sampler1) if run_validation: curr_val_loss = validation(model, rank, world_size, val_loader) scheduler.step() if rank == 0: print(f"--> epoch {epoch} completed...entering save and stats zone") dur.append(time.time() - t0) train_acc_tracking.append(train_accuracy.item()) if run_validation: val_acc_tracking.append(curr_val_loss.item())
- troubleshooting
RuntimeError: Expected a 'cuda' device type for generator but found 'cpu'
- 'cuda' is set with torch.set_default_device("cuda")
- shuffled Sampler always create generator = torch.Generator()
- Solution: disable shuffle or set torch.set_default_device("cpu")
RuntimeError: cannot pin 'torch.cuda.FloatTensor' only dense CPU tensors can be pinned
- solution: place everythin of CPU according to tutorial
- save dataset items to CPU
CUDA error: invalid device ordinal
- 1694694477 worker-0: CUDA kernel errors might be asynchronously reported at some other API call, so the stacktrace below might be incorrect.
- 1694694477 worker-0: For debugging consider passing CUDA_LAUNCH_BLOCKING=1.
- 1694694477 worker-0: Compile with `TORCH_USE_CUDA_DSA` to enable device-side assertions.
- Solution: ? I forgot, set .to(device) not .to(rank)
Timed out initializing process group in store based barrier on rank
- increase: torch.distributed.init_process_group(timeout=datetime.timedelta(seconds=1800))
RuntimeError: Cannot re-initialize CUDA in forked subprocess. To use CUDA with multiprocessing, you must use the 'spawn' start method
- pickle.load problem with read_image no problem
try: torch.multiprocessing.set_start_method('spawn',force=True) except RuntimeError: pass
11.19.5. elastic (launch)
torchrun - superset of the functionality as torch.distributed.launch
11.19.6. torch.distributed.launch
- dist.init_process_group(backend, init_method)
links
11.19.7. KubeFlow PyTorchJob
$ env for pod/pytorch-simple-worker-0:
KUBERNETES_SERVICE_PORT_HTTPS=443 NVIDIA_VISIBLE_DEVICES=all KUBERNETES_SERVICE_PORT=443 PYTHONUNBUFFERED=0 HOSTNAME=pytorch-simple-worker-0 MASTER_PORT=23456 PWD=/workspace NVIDIA_DRIVER_CAPABILITIES=compute,utility WORLD_SIZE=2 HOME=/root KUBERNETES_PORT_443_TCP=tcp://10.96.0.1:443 PYTORCH_VERSION=2.0.1 MASTER_ADDR=pytorch-simple-master-0 TERM=xterm SHLVL=1 KUBERNETES_PORT_443_TCP_PROTO=tcp KUBERNETES_PORT_443_TCP_ADDR=10.96.0.1 LD_LIBRARY_PATH=/usr/local/nvidia/lib:/usr/local/nvidia/lib64 RANK=1 KUBERNETES_SERVICE_HOST=10.96.0.1 KUBERNETES_PORT=tcp://10.96.0.1:443 KUBERNETES_PORT_443_TCP_PORT=443
11.19.8. investiage
import torch print("distributed available", torch.distributed.is_available()) print("distributed initilized", torch.distributed.is_initialized()) # -- CUDA torch.cuda.is_available() # True torch.cuda.device_count() # 1 torch.cuda.current_device() # 0 torch.cuda.device(0) # <torch.cuda.device at 0x7efce0b03be0> torch.cuda.get_device_name(0) # 'GeForce GTX 950M' print("cuda") print(torch.cuda.is_available()) # True print(torch.cuda.device_count()) # 1 print(torch.cuda.current_device()) # 0 print(torch.cuda.device(0)) # <torch.cuda.device at 0x7efce0b03be0> print(torch.cuda.get_device_name(0)) # 'GeForce GTX 950M' print()
11.19.9. links
- main https://pytorch.org/docs/stable/distributed.html
- tutorial https://pytorch.org/tutorials/beginner/dist_overview.html
- https://pyimagesearch.com/2021/10/18/introduction-to-distributed-training-in-pytorch/
- overview of torch.distributed https://pytorch.org/tutorials/beginner/dist_overview.html
- 2.0 news https://pytorch.org/get-started/pytorch-2.0/#distributed
- DDP https://pytorch.org/docs/stable/notes/ddp.html
11.20. retain_graph
https://pytorch.org/docs/stable/autograd.html
loss.backward(retain_graph=True)
LSTM slowed becouse of hidden state saved between. Solutions:
- detach/repackage the hidden state in between batches.
- hidden.detach_()
- hidden = hidden.detach()
11.21. memory management
if a is a tensor:
- a.to(torch.device("cpu"/"cuda:0")) - move tensor around
making sure t2 is on the same device as t2
- a = t1.get_device()
- b = torch.tensor(a.shape).to(dev)
Using Multiple GPUs:
- Data Parallelism, where we divide batches into smaller batches, and process these smaller batches in parallel on multiple GPU.
- Model Parallelism, where we break the neural network into smaller sub networks and then execute these sub networks on different GPUs.
del out, loss - free tensor/model torch.cuda.empy_cache() - empty garbage
with torch.no_grad(): - PyTorch, by default, will create a computational graph during the forward pass. During creation of this graph, it will allocate buffers to store gradients and intermediate values which are used for computing the gradient during the backward pass.
CuDNN can provided a lot of optimisation which can bring down your space usage,
- torch.backends.cudnn.benchmark = True
- torch.backends.cudnn.enabled = True
Using 16-bit Floats
- model = model.half() # convert a model to 16-bit
- input = input.half() # convert a model to 16-bit
- issues:
- batch-norm layers have convergence issues with half precision floats. If that's the case with you, make sure that batch norm layers are float32
- You can have overflow issues with 16-bit float. Once, I remember I had such an overflow while trying to store the Union area of two bounding boxes (for computation of IoUs) in a float16. So make sure you have a realistic bound on the value you are trying to save in a float16.
11.22. troubleshooting
Input type (torch.FloatTensor) and weight type (torch.cuda.FloatTensor)
- dataset on CPU, model on GPU
- solution: Dataset.__getItem__(self, idx): return image.to(device), torch.tensor(label, dtype=torch.long).to(device)
"RuntimeError: Expected a 'cuda' device type for generator but found 'cpu'"
- solution:
generator = torch.Generator(device=device) train_loader: DataLoader = DataLoader(train_dataset, shuffle=True, batch_size=BATCH_SIZE, generator=generator)
AttributeError: 'collections.OrderedDict' object has no attribute 'eval'
model = TempModel() model.load_state_dict(torch.load(file_path))
torch.cuda.OutOfMemoryError: CUDA out of memory. If reserved memory is >> allocated memory try setting max_split_size_mb to avoid fragmentation.
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "max_split_size_mb:256"
11.23. plot learning curve
LOGFILE=torch/logs/log-2023-09-10-local.txt cat $LOGFILE | grep "loss" | cut -d ' ' -f 4 | cut -d ',' -f 1 > /tmp/loss cat $LOGFILE | grep "loss" | cut -d ' ' -f 7 | cut -d ',' -f 1 > /tmp/acc python -c " acc = [float(x[:-1]) for x in open('/tmp/acc', 'r').readlines()] loss = [float(x[:-1]) for x in open('/tmp/loss', 'r').readlines()] import numpy as np acc = np.array(acc) loss = np.array(loss) acc = (acc - np.min(acc)) / (np.max(acc) - np.min(acc)) loss = (loss - np.min(loss)) / (np.max(loss) - np.min(loss)) import matplotlib.pyplot as plt plt.plot(list(range(len(acc))), acc, label='accuracy') plt.plot(list(range(len(loss))), loss, label='loss') plt.legend() plt.title('Scaled accuracy and loss') plt.savefig('/tmp/a.png') "
11.24. links
12. TODO PaddlePaddle 飞桨
PArallel Distributed Deep LEarning https://www.paddlepaddle.org.cn/