Category: SAS

Spark practice (3): clean and sort Social Security numbers

Sample.txt
Requirements:
1. separate valid SSN and invalid SSN
2. count the number of valid SSN
402-94-7709 
283-90-3049
124-01-2425
1231232
088-57-9593
905-60-3585
44-82-8341
257581087
327-84-0220
402-94-7709

Thoughts

SSN indexed data is commonly seen and stored in many file systems. The trick to accelerate the speed on Spark is to build a numerical key and use the sortByKey operator. Besides, the accumulator provides a global variable existing across machines in a cluster, which is especially useful for counting data.

Single machine solution

#!/usr/bin/env python
# coding=utf-8
htable = {}
valid_cnt = 0
with open('sample.txt', 'rb') as infile, open('sample_bad.txt', 'wb') as outfile:
for l in infile:
l = l.strip()
nums = l.split('-')
key = -1
if l.isdigit() and len(l) == 9:
key = int(l)
if len(nums) == 3 and map(len, nums) == [3, 2, 4]:
key = 1000000*int(nums[0]) + 10000*int(nums[1]) + int(nums[2])
if key == -1:
outfile.write(l + 'n')
else:
if key not in htable:
htable[key] = l
valid_cnt += 1

with open('sample_sorted.txt', 'wb') as outfile:
for x in sorted(htable):
outfile.write(htable[x] + 'n')

print valid_cnt

Cluster solution

#!/usr/bin/env python
# coding=utf-8
import pyspark
sc = pyspark.SparkContext()
valid_cnt = sc.accumulator(0)

def is_validSSN(l):
l = l.strip()
nums = l.split('-')
cdn1 = (l.isdigit() and len(l) == 9)
cdn2 = (len(nums) == 3 and map(len, nums) == [3, 2, 4])
if cdn1 or cdn2:
return True
return False

def set_key(l):
global valid_cnt
valid_cnt += 1
l = l.strip()
if len(l) == 9:
return (int(l), l)
nums = l.split('-')
return (1000000*int(nums[0]) + 10000*int(nums[1]) + int(nums[2]), l)

rdd = sc.textFile('sample.txt')
rdd1 = rdd.filter(lambda x: not is_validSSN(x))

rdd2 = rdd.filter(is_validSSN).distinct()
.map(lambda x: set_key(x))
.sortByKey().map(lambda x: x[1])

for x in rdd1.collect():
print 'Invalid SSNt', x

for x in rdd2.collect():
print 'valid SSNt', x

print 'nNumber of valid SSN is {}'.format(valid_cnt)

# Save RDD to file system
rdd1.saveAsTextFile('sample_bad')
rdd2.saveAsTextFile('sample_sorted')
sc.stop()

%SVD macro with BY-Processing

For the Regularized Discriminant Analysis Cross Validation, we need to compute SVD for each pair of ((lambda, gamma)), and the factorization result will be feed to the downdating algorithm to obtain leave one out variance-covariance matrix (hat{Sigma}_{kv}(lambda, gamma)). The code is a direct modification of my original SVD macro @here. It is much more than simply put “BY-” statement in PROC PRINCOMP, but still easy to do. I used the within class CSSCP from the Iris data shipped with SAS and cross verified with R. %CallR macro is used here.

The screenshot below shows the SAS macro generates the same result as from R. Here is the test code.


dm log 'clear';
proc discrim data=sashelp.iris noclassify noprint
outstat=iris_stat(WHERE=(_TYPE_ = "CSSCP" & Species ^= " "))
;
class Species;
var SepalLength SepalWidth PetalLength PetalWidth;
run;

%svd_bystmt(sigma, output_v, output_s, output_u, &covars, Species, , nfac=0);

data _null_;
file "d:datasigma.csv" dlm=",";
set iris_stat;
if _n_=1 then do;
put 'Species' ','
'_NAME_' ','
'SepalLength' ','
'SepalWidth' ','
'PetalLength' ','
'PetalWidth';
end;
put Species _NAME_ SepalLength SepalWidth PetalLength PetalWidth;
run;

%RScript(d:datarscript.r)
cards4;
dat=read.csv("d:\data\sigma.csv", header=1, as.is=1)
head(dat)
sigma=dat[, c(-1, -2)]
sigmasvd=by(sigma, dat$Species, svd)
sigmasvd$Setosa$d
sigmasvd$Virginica$d
sigmasvd$Setosa$v
;;;;
run;

%CallR(d:/data/rscript.r, d:/data/rlog1.txt);

data _null_;
infile "d:datarlog1.txt";
input;
put _infile_;
run;
data _null_;
do until (eof1);
set output_s end=eof1;
put @1 Species= @24 Number= @40 Singuval= 8.2;
end;
put " ";
do until (eof2);
set output_v end=eof2;
where Species="Setosa";
put Prin1 8.5 Prin2 8.5 Prin3 8.5 Prin4 8.5;
end;
stop;
run;


%macro SVD_bystmt(
input_dsn,
output_V,
output_S,
output_U,
input_vars,
by_var,
ID_var,
nfac=0
);

%local blank para EV USCORE n pos dsid nobs nstmt
shownote showsource nbylevel bystmt;

%let shownote=%sysfunc(getoption(NOTES));
%let showsource=%sysfunc(getoption(SOURCE));
options nonotes nosource;

%let blank=%str( );
%let EV=EIGENVAL;
%let USCORE=USCORE;

%if &by_var eq &blank %then %do;
%let bystmt = ␣
%let nbylevel = 1;
%end;
%else %do;
%let bystmt = by &by_var;
%end;

%let n=%sysfunc(countW(&input_vars));

%let dsid=%sysfunc(open(&input_dsn));
%let nobs=%sysfunc(attrn(&dsid, NOBS));
%let dsid=%sysfunc(close(&dsid));
%if &nfac eq 0 %then %do;
%let nstmt=␣ %let nfac=&n;
%end;
%else %do;
%let x=%sysfunc(notdigit(&nfac, 1));
%if &x eq 0 %then %do;
%let nfac=%sysfunc(min(&nfac, &n));
%let nstmt=%str(n=&nfac);
%end;
%else %do;
%put ERROR: Only accept non-negative integer.;
%goto exit;
%end;
%end;

/* calculate U=XV/S */
%if &output_U ne %str() %then %do;
%let outstmt= out=&output_U.(keep=&by_var &ID_var Prin:);
%end;
%else %do;
%let outstmt=␣
%end;

/* build index for input data set */
proc datasets library=work nolist;
modify &input_dsn;
index create &by_var;
run;quit;

%let options=noint cov noprint &nstmt;

proc princomp data=&input_dsn
/* out=&input_dsn._score */
&outstmt
outstat=&input_dsn._stat(where=(_type_ in ("&USCORE", "&EV"))) &options;
&bystmt;
var &input_vars;
run;

/*
need to check if the by_var is Char or Numeric, then set the
format accordingly
*/
data _null_;
set &input_dsn;
type=vtype(&by_var);
if type="C" then
call symput("bylevelfmtvar", "$bylevelfmt");
else
call symput("bylevelfmtvar", "bylevelfmt");
run;


data __bylevelfmt;
set &input_dsn._stat end=eof;
&bystmt;
retain _nbylevel 0;
retain fmtname "&bylevelfmtvar";
if first.&by_var then do;
_nbylevel+1;
start=&by_var;
label=_nbylevel;
output;
keep label start fmtname ;
end;
if eof then call symput("_nbylevel", _nbylevel);
run;
proc format cntlin=__bylevelfmt;
run;
%put &_nbylevel;


%if &output_V ne &blank %then %do;
proc transpose data=&input_dsn._stat(where=(_TYPE_="&USCORE"))
out=&output_V.(rename=(_NAME_=variable))
name=_NAME_;
&bystmt;
var &input_vars;
id _NAME_;
format &input_vars 8.6;
run;
%end;

/* recompute Proportion */
%if &output_S ne %str() %then %do;
data &output_S;
set &input_dsn._stat ;
&bystmt;
where _TYPE_="EIGENVAL";
array _s{*} &input_vars;
array _x{&nfac, 3} _temporary_;
Total=sum(of &input_vars, 0);
_t=0;
do _i=1 to &nfac;
_x[_i, 1]=_s[_i];
_x[_i, 2]=_s[_i]/Total;
if _i=1 then
_x[_i, 3]=_x[_i, 2];
else
_x[_i, 3]=_x[_i-1, 3]+_x[_i, 2];
_t+sqrt(_x[_i, 2]);
end;
do _i=1 to &nfac;
Number=_i;
EigenValue=_x[_i, 1]; Proportion=_x[_i, 2]; Cumulative=_x[_i, 3];
S=sqrt(_x[_i, 2])/_t; SinguVal=sqrt(_x[_i, 1] * &nobs/&_nbylevel);
keep &by_var Number EigenValue Proportion Cumulative S SinguVal;
output;
end;
run;
%end;

%if &output_U ne %str() %then %do;
data &output_U;
array _S{&_nbylevel, &nfac} _temporary_;
if _n_=1 then do;
do until (eof);
set &output_S(keep=&by_var Number SinguVal) end=eof;
where Number<=&nfac;
%if &by_var eq &blank %then %do;
_row=1;
%end;
%else %do;
_row = input(put(&by_var, &bylevelfmtvar..), best.);
%end;
_S[_row, number]=SinguVal;
if abs(_S[_row, number]) < CONSTANT('MACEPS') then _S[_row, j]=CONSTANT('BIG');
end;
end;
set &output_U;
array _A{*} Prin1-Prin&nfac;
%if &by_var eq &blank %then %do;
_row=1;
%end;
%else %do;
_row = input(put(&by_var, &bylevelfmtvar..), best.);
%end;
do _j=1 to dim(_A);
_A[_j]=_A[_j]/_S[_row, _j];
end;
keep &by_var &ID_var Prin1-Prin&nfac ;
run;
%end;

proc datasets library=work nolist;
modify &input_dsn;
index delete &by_var;
run;quit;


%exit:
options &shownote &showsource;
%mend;

SAS crash with BULKLOAD and ODBC Driver 11 for SQL Server

SAS 9.4 (TS1M2) crashes hard when using BULKLOAD=YES with the latest Microsoft ODBC Driver 11 for SQL Server. For a brief moment you may see in the SAS log ERROR: BCP initialize error: [Microsoft][ODBC Driver 11 for SQL Server]Connection is not enable…

Experient downdating algorithm for Leave-One-Out CV in RDA

In this post, I want to demonstrate a piece of experiment code for downdating algorithm for Leave-One-Out (LOO) Cross Validation in Regularized Discriminant Analysis [1].

In LOO CV, the program needs to calculate the inverse of (hat{Sigma}_{kv}(lambda, gamma)) for each observation v to obtain its new scoring function at a given pair of regularizing pair parameter ( (lambda, gamma) ), where (kv) indicates class (k) excluding observation (v).

As mentioned in [1], to obtain the inverse of this variance covariance matrix for class k, it is not sufficient to just remove observation (v) and recalculate it, but instead, we try to calculate $$W_{kv}(lambda, gamma)hat{Sigma}^{-1}_{k\v}(lambda, gamma)$$, which is equivalent to compute the inverse of :

[
  W_k(lambda)hat{Sigma}^{-1}_k (lambda) – gamma |Z_v|^2  cdot  I – (1-gamma)Z_v Z_v^{t} ]

In this formula, the first element (W_k(lambda)hat{Sigma}^{-1}_k (lambda) ) does not involve quantities varying along with observation (v) and can be pre-computed. The latter two can be easily updated on the fly as we scan through each individual observations.

With this in mind, we use the iris data for demonstration. Note that the SAS code here is for illustrating the concept of downdating algorithm above for a given pair of ( (lambda, gamma) ). The computing is divided into several steps. (1) We first use PROC DISCRIM to calculate CSSCP matrix, total weight and variable mean by class. These quantities correspond to ( Sigma_k ), ( bar{X_k}) and ( W_k) in the original paper. (2) We then use the %SVD Macro @here to obtain key elements for later updating.

Note that in order to obtain CSSCP, MEAN and total weights, in addition to PROC DISCRIM, we can also use PROC CORR. The advantage is that we can keep all computing within the framework of SAS/Base so to benefit budget-sensitive business and maximize compatibility. The down side is that the data set need to sorted or indexed to obtain CSSCP by class and the data has to be passed twice, one for pooled CSSCP and the other for class specific CSSCP.

(to be completed…)

   /* study downdating algorithm */ %let lambda = 0.3; %let gamma = 0.6; %let target = Species; %let covars = SepalLength SepalWidth  PetalLength  PetalWidth;  /* step 0. Obtain key statistics from DISCRIM */ proc discrim data=iris  outstat=stat noprint  noclassify;       class ⌖    var   &covars; run;  /* step 1. Obtain Sigma_k(lambda) & Sigma_k(lambda, gamma)            S_k = (_TYPE_="CSSCP' & &target = "something")            S  = (_TYPE_="CSSCP" & &target = " ")            W_k = (_TYPE_="N" & &target = "something") -1            W  = (_TYPE_="N" & &target = " ") - &nClass + 1            Sigma_k(lambda) = S_k(lambda) / W_k(lambda)               where:                     S_k(lambda) = (1-lambda)*S_k + lambda*S                     W_k(lambda) = (1-lambda)*W_k + lambda*W            Sigma_k(lambda, gamma) = (1-lambda)*Sigma_k(lambda) + gamma* trace[Sigma_k(lambda)]/p*I          */  proc sql noprint;      select distinct &target into :targetvalues separated by " "   from   stat   where _TYPE_="CSSCP"   ;   select distinct _NAME_ into :covars separated by " "   from   stat   where _TYPE_="CSSCP"   ; quit; %put &targetvalues; %put &covars;  %let nClass=%sysfunc(countw("&targetvalues")); %put &nClass; %let nVars=%sysfunc(countw("&covars")); %put &nVars;  data weights;      set stat;   where _TYPE_="N";   array _nv{*} _numeric_;   if &target="" then _total=_nv[1]-&nClass+1;   retain lambda λ   retain _total;   weight=(1-&lambda) * (_nv[1]-1) + &lambda*_total;     keep &target   weight  lambda;   if &target ^= ""; run;  proc sort data=stat  out=sigma;       by _TYPE_  ⌖    WHERE _TYPE_ = "CSSCP" & &target ^= " "; run; proc sort data=weights;      by  ⌖ run;  /*-  step 1.1 Update Sigma_k(lambda) = ((1-lambda)*S_k + lambda*S) / ( (1-lambda)*W_k + lambda*W )  -*/                                                                                            data sigma;      array _sigma{&nVars, &nVars} _temporary_;   array _x{&nVars} &covars;   array _y{&nClass} _temporary_;     if _n_=1 then do;      _iter=1;      do until (eof1);         set stat(where=(_TYPE_="CSSCP" & &target = " "))  end=eof1;      do _j=1 to &nVars;         _sigma[_iter, _j]=_x[_j];      end;      _iter+1;      end;                _iter=1;       do until (eof2);      set weights end=eof2;         _y[_iter]=weight;      _iter+1;   end;   put _y[3]=;      _target=⌖   _iter=0;   end;   modify sigma ;     retain  weight;   retain  _target;   _TYPE_="COV";     if &target^=_target then do;         _iter+1;         _i=0;    weight=_y[_iter];   _target=⌖   end;   _i+1;   put "&covars";   put &covars;   do _j=1 to &nVars;      _x[_j]= ((1-&lambda)*_x[_j] + &lambda*_sigma[_i, _j])  ;   end;   put &covars;   put "-----------------------------------------------------------------";   replace;   run;  /*- trace is sum of every elemnt of a matrix -*/ data _trace;      set sigma;   by ⌖   array _x{*} &covars;   retain trace _iter;   if first.&target then do;      _iter = 1;   trace=0;   end;   trace+_x[_iter];   _iter+1;   if last.&target then output;   keep &target  trace; run;   /*- step 1.2 Update Sigma_k (lambda, gamma) = (1-gamma)*Sigma_k(lambda) + gamma *c*I -*/ /*- where  c = trace(Sigma_k(lambda)) / p, where p=&nVars                             -*/ data sigma;       if _n_=1 then do;       declare hash h(dataset:'_trace');    h.defineKey("&target");    h.defineData("trace");    h.defineDone();     call missing(trace);    end;    set sigma; by ⌖    array _x{*} &covars;    retain _adj  trace _iter;    if first.&target then do;       _iter=1;    end;    if _iter=1 then do;          _rc=h.find();          _adj= &gamma/&nVars * trace;    end;    put _iter=  &target=;    do _j=1 to &nVars;       if _j=_iter then do;       _x[_iter] = (1-&gamma)*_x[_iter] + _adj;    end;    else do;             _x[_iter] = (1-&gamma)*_x[_iter];    end;    end;    _iter=_iter+1;    drop _type_   _adj _iter _j  _rc  trace; run;    /*-step 1.3 Obtain the inverse of W_k Sigma_k(lambda, gamma) - c*I  using SVD                                     -*/   /*-This inverse is the same as first inverse Sigma_k(lambda, gamma), then with S matrix deflated by W_k(lambda)  -*/ /*-and differenced by c, where c=gamma* |Z_v|^2/p, and Z_v = sqrt(bk(v))*(X_v- mean(X_kv),                         -*/ /*-bk(v)=sk(v)*Wc(v) wv/(Wc(v)-wv)                                                                                   -*/ /*-v is the current observation                                                                                      -*/  /*-       1. apply SVD to obtain eigenvalues    %SVD(sigma, out_u, out_S, out_V)                                                                                     -*/   /*-       2. The whole CV will be done in one Data Step              Because all related quantitaties of the rest, such as such as |Z_v|, sk(v), Wc(v)               will require at least one pass through of the original data, therefore it is              best to incorporate all computing in this step. -*/ data _iris;      array _m{*} &covars;      array _mt{&nVars} _temporary_;      if _n_=1 then do;      _class=1;         do until eof;      set stat(where=(_TYPE_='MEAN'& &target^=""))  end=eof;      do _j=1 to dim(_m);         _mt[_class, _j]=_m[_j];      end;      _class+1;   end;   end;   array _rank1{&nVars, &nVars} _temporary_;   set iris point=1;     do _j=1 to dim(_m);      _m[_j]=_m[_j]-_mt[1, _j];   end;   do _j=1 to &nVars;      do _k=1 to &nVars;            _rank[_j, _k]=_m[_j] * _[_k];             /*  MORE CODE TO COME*/       end;   end; run; 

Control Excel via SAS DDE & Python win32com

Excel is probably the most used interface between human and data. Whenever you are dealing with business people, Excel is the de facto means for all things about data processing. I used to only use SAS and Python for number crunching but in one of my r…

Spark practice (2): query text using SQL

In a class of a few children, use SQL to find those who are male and weight over 100.
class.txt (including Name Sex Age Height Weight)
Alfred M 14 69.0 112.5 
Alice F 13 56.5 84.0
Barbara F 13 65.3 98.0
Carol F 14 62.8 102.5
Henry M 14 63.5 102.5
James M 12 57.3 83.0
Jane F 12 59.8 84.5
Janet F 15 62.5 112.5
Jeffrey M 13 62.5 84.0
John M 12 59.0 99.5
Joyce F 11 51.3 50.5
Judy F 14 64.3 90.0
Louise F 12 56.3 77.0
Mary F 15 66.5 112.0
Philip M 16 72.0 150.0
Robert M 12 64.8 128.0
Ronald M 15 67.0 133.0
Thomas M 11 57.5 85.0
William M 15 66.5 112.0

Thoughts

The challenge is to transform unstructured data to structured data. In this question, a schema has to be applied including column name and type, so that the syntax of SQL is able to query the pure text.

Single machine solution

Straight-forward and simple if with Python’s built-in module sqlite3.
import sqlite3

conn = sqlite3.connect(':memory:')
c = conn.cursor()
c.execute("""CREATE TABLE class
(Name text, Sex text, Age real, Height real, Weight real)"""
)

with open('class.txt') as infile:
for l in infile:
line = l.split()
c.execute('INSERT INTO class VALUES (?,?,?,?,?)', line)
conn.commit()

for x in c.execute("SELECT * FROM class WHERE Sex = 'M' AND Weight > 100"):
print x
conn.close()

Cluster solution

Spark SQL is built on Hive, and seamlessly queries the JSON formatted data that is semi-structured. To dump the JSON file on the file system will be the first step.
import os
import subprocess
import json
from pyspark import SparkContext
from pyspark.sql import HiveContext
sc = SparkContext()
hiveCtx = HiveContext(sc)
def trans(x):
return {'Name': x[0], 'Sex': x[1], 'Age': int(x[2]),
'Height': float(x[3]), 'Weight': float(x[4])}
# Remove the output directory for JSON if it exists
if 'class-output' in os.listdir('.'):
subprocess.call(['rm', '-rf', 'class-output'])

rdd = sc.textFile('class.txt')
rdd1 = rdd.map(lambda x: x.split()).map(lambda x: trans(x))
rdd1.map(lambda x: json.dumps(x)).saveAsTextFile('class-output')

infile = hiveCtx.jsonFile("class-output/part-00000")
infile.registerTempTable("class")

query = hiveCtx.sql("""SELECT * FROM class WHERE Sex = 'M' AND Weight > 100
"""
)
for x in query.collect():
print x

sc.stop()
 In a conclusion, JSON should be considered if SQL is desired on Spark.