Hidden Markov Models

Table of Contents

Model

Hidden Markov Model (HHM) is a probability model used to model latent states ( $z_t$ ) based on observations which depends on the current state of the system. That is; at some step $t$ the system is in some "internal" / unobservable / latent state $z_t$. We then observe an output $x_t$ which depends on the current state, $z_t$, the system is in.

hmm.png

Derivation

  • Hidden states/rvs $z_1, ..., z_k \in Q$
    • $Q = \{1, ..., m\}$ i.e. a discrete set of states
  • Observed rvs $\mathbf{x}_1, ..., \mathbf{x}_k \in \chi$
    • $\chi$ is the set of all possible values that $\mathbf{x}$ can take, e.g. discrete, $\mathcal{R}, \mathcal{R}^d$

Then,

\begin{equation*}
p(\mathbf{x}_1, ..., \mathbf{x}_n, z_1, ..,  z_n) 
= p(z_1) p(\mathbf{x}_1 | z_1) \quad \overset{n}{\underset{k=2}{\prod}} p(z_k | z_{k-1}) p(\mathbf{x}_k | z_k)
\end{equation*}
1

The reason why we use $\mathbf{x_k}$ here is that elements of $\chi$, can potentially be multi-dimensional, e.g. $\mathcal{R}^d$, then $\mathbf{x}_k$ represents a vector of observed rvs, not just a single observed rv.

Parameters

  • Transition probabilities/matrix: $T(i, j) = P(Z_k = j | Z_{k-1} = i)$
  • Emission probabilites: $\varepsilon_i (x) = p(x | Z_k = i)$ for $i \in Q, x \in \chi$
    • $\varepsilon_i$ is then a prob. dist. on $\chi$
  • Initial dist: $\pi(i) = P(Z_1 = 1), \quad i \in Q$

The difference between $z_k$ and $Z_k$, e.g. lower- and uppercase notation is that $z_k$ represents the distribution of the rv $Z_k$, while $Z_k$ is the actual rv, and so $Z_k = i$ means the case where the rv $Z_k$ takes on the value $i$. The notation $P(Z_k = i)$ for some arbitrary $i$ is then talking about a discrete distribution.

Using the substitutions from above, the join distribution is the following:

\begin{equation*}
p(\mathbf{x}_1, ..., \mathbf{x}_n, z_1, ..,  z_n) 
= \pi(i) \varepsilon_{z_1} (\mathbf{x}_1) \quad 
\overset{n}{\underset{k=2}{\prod}} T(z_k, z_{k-1}) \varepsilon_{z_k}(\mathbf{x}_k)
\end{equation*}
2

The emission prob. dist for the different values of the hidden states can be basically any prob. dist.

The plot below shows a problem where HMM might be a good application, where we have the states $Q = \{0, 1\}$ in blue and observations in red with emission probability $\varepsilon_z = \mathcal{N}(z, 0.5)$.

simple_hhm_example_problem.png

Forward-backward algorithm / alpha-beta recursion

Example of a dynamic programming algorithm. This provides us with a way of efficiently estimating the probability of the system being in some state $z_k$ at the $k^{\text{th}}$ step given all $n$ observations we've seen, $\mathbf{X}$, i.e. $p(z_k | \mathbf{X})$.

This does not provide us with a way to calculate the probability of a sequence of states, given the observations we've seen. For that we have to have a look at the Viterbi algorithm.

Notation

  • $\mathbf{X} = (\mathbf{x}_1, ..., \mathbf{x}_n)$ i.e. the sequence of observations made, not the rv representing some $\mathbf{x}$
  • $\mathbf{X}_{i:j}$ represents the sequence of observations from the $i^{\text{th}}$ observation to the $j^{\text{th}}$ observation

Algorithm

We make the assumptions that we know $p(x_k | z_k)$, $(p(z_k | z_{k-1})$ and $p(z_1)$.

Our goal of the entire algorithm is to compute $p(z_k | \mathbf{X})$.

Forward algorithm
computes $p(z_k, \mathbf{X}_{1:k})$ for all $k \in \{1, ..., n\}$
Backward algorithm
computes $p(\mathbf{X}_{k+1:n} | z_k)$ for all $k \in \{1, ..., n\}$

We can write our goal in the following way:

    \begin{equation*}
    p(z_k | \mathbf{X}) \propto p(z_k, \mathbf{X})
    = p(\mathbf{X}_{k+1:n} | z_k, \mathbf{X}_{1:k}) p(z_k, \mathbf{X}_{1:k})
\end{equation*}
3

Which means that if we can solve the forward and backward algorithms described above, we can compute $p(z_k | \mathbf{X})$.

What we're actually saying here is:

  • Given that we know the value of the rv. $Z_k$ the sequence of observations $\mathbf{X}_{k+1:n}$ does not depend on the observations prior to the $k^{\text{th}}$ observation.
  • Observations prior to the $k^{\text{th}}$ observation, does not depend on the observations in the future

Forward algorithm

Our goal is to compute $p(z_k, \mathbf{X}_{1:k})$. We start out with the following:

    \begin{equation*}
    \begin{split}
    \alpha_k (z_k) &= p(z_k, \mathbf{X}_{1:k}) \\
    &= \overset{m}{\underset{z_{k-1} = 1}{\sum}} p(z_k, z_{k-1}, \mathbf{X}_{1:k}) \\
    &= \underset{z_{k-1}}{\sum} p(\mathbf{x}_k | z_k, z_{k-1}, \mathbf{X}_{1:k-1}) p(z_k | z_{k-1}, \mathbf{X}_{1:k-1}) p(z_{k-1} | x_{1:k-1})
    \end{split}
\end{equation*}
4

In words: probability of observing $\mathbf{x}_k$ given that we have observed the sequence $\mathbf{X}_{1:k-1}$ and we're in state $z_k$ marginalized / summed over the possible previous states $z_{k-1}$.

But since the observation at the $k^{\text{th}}$ step, $\mathbf{x}_k$, is independent of $z_{k-1}$ given that we know $z_k$. And so we have:

    \begin{equation*}
    p(\mathbf{x}_k | z_k, z_{k-1}, \mathbf{X}_{1:k-1}) = p(\mathbf{x}_k | z_k)
\end{equation*}
5

And the $k^{\text{th}}$ state, $z_k$, is independent of the previous observations $\mathbf{X}_{1:k-1}$ given that we know $z_{k-1}$. And so further we have:

    \begin{equation*}
    p(z_k | z_{k-1}, \mathbf{X}_{1:k-1}) = p(z_k | z_{k-1})
\end{equation*}
6

Thus, we end up with:

    \begin{equation*}
    \begin{split}
    \alpha_k (z_k) &= p(z_k, \mathbf{X}_{1:k}) \\
    &= \overset{m}{\underset{z_{k-1} = 1}{\sum}} p(z_k, z_{k-1}, \mathbf{X}_{1:k}) \\
    &= \underset{z_{k-1}}{\sum} p(\mathbf{x}_k | z_k, z_{k-1}, \mathbf{X}_{1:k-1}) p(z_k | z_{k-1}, \mathbf{X}_{1:k-1}) p(z_{k-1} | x_{1:k-1}) \\
    &= \underset{z_{k-1}}{\sum} p(\mathbf{x}_k | z_k) p(z_k | z_{k-1}) p(z_{k-1} | x_{1:k-1}) \\
    &= \underset{z_{k-1}}{\sum} p(\mathbf{x}_k | z_k) p(z_k | z_{k-1}) \alpha_{k-1}(z_{k-1})
    \end{split}
\end{equation*}
7

And if you go back to our assumptions you see that we have assumed that we know the emission probabilities $p(\mathbf{x}_k | z_k)$ and the transition probabilities $p(z_k | z_{k-1})$. And thus we only have one unknown in our equation.

Tracking this recursive relationship all the way back to the $z_1$, we end up with zero unknowns, since we also assumed to know the initial probability $p(z_1)$.

So just to conclude:

    \begin{equation*}
    \begin{split}
    \alpha_k &= \underset{z_{k-1}}{\sum} p(\mathbf{x}_k | z_k) p(z_k | z_{k-1}) \alpha_{k-1}(z_{k-1}) \\
    \alpha_1 &= p(z_1, \mathbf{x}_1) = p(z_1) p(\mathbf{x}_1 | z_1)
    \end{split}
\end{equation*}
8

provides us with the enough info to compute $p(z_k, \mathbf{X}_{1:k})$ in a recursive manner.

Time complexity
  • For each step $k$ we have to sum over the $m$ possible values for the previous state $z_{k-1}$, and so this is of time complexity $\Theta(m)$
  • But we have to do the step above for every possible value of $z_k$, thus we have an additional $\Theta(m)$
  • Finally, we gotta do $n$ of these steps, and so we have the two steps above with total time complexity $\Theta(m^2)$ being computed $n$ times, leading us the time complexity of the Forward algorithm to be $\Theta(nm^2)$

This is cool as it is linear in the number of steps, $n$!

Backward algorithm

Our goal is to compute $p(\mathbf{X}_{k+1:n} | z_k)$ for all $k \in \{1, ..., n\}$. And again we try to deduce some recursive algorithm.

We start out with:

    \begin{equation*}
    p(\mathbf{X}_{k+1:n} | z_k) = \overset{m}{\underset{z_{k+1}=1}{\sum}} p(z_{k+1} | z_k) p(\mathbf{X}_{k+1} | z_{k+1}) p(\mathbf{X}_{k+2:n} | z_{k+1})
\end{equation*}
9

Then if we simply define some auxilary variable $\beta_k (z_k)$ to be

    \begin{equation*}
    \beta_k (z_k) = p(\mathbf{X}_{k+1:n} | z_k) = \overset{m}{\underset{z_{k+1}=1}{\sum}} p(z_{k+1} | z_k) p(\mathbf{X}_{k+1} | z_{k+1}) \beta_{k+1}(z_{k+1})
\end{equation*}
10

Aaaand we got ourselves a nice recursive relationship once again! Again, by assumption, we know what $p(\mathbf{x}_{k+1} | z_{k+1})$ and $p(z_{k+1} | z_k)$ is.

And the final step is as follows

\begin{equation*}
\begin{split}
\beta_{n-1} (z_{n-1}) &= p(\mathbf{x}_n | z_{n-1}) \\
&= \underset{z_n}{\sum} p(\mathbf{x}_n | z_n) p(z_n | z_{n-1}) \\
& \implies \beta_n(z_n) = 1, \quad \forall z_n
\end{split}
\end{equation*}
11

Thus, we conclude the Backward algorithm with the following:

    \begin{equation*}
    \begin{split}
    \beta_k (z_k) &= \overset{m}{\underset{z_{k+1}=1}{\sum}} p(z_{k+1} | z_k) p(\mathbf{x}_{k+1} | z_{k+1}) \beta_{k+1}(z_{k+1}), \quad k = 1, ..., n-1 \\
    \beta_{n} (z_{n}) &= 1, \quad \forall z_n
    \end{split}
\end{equation*}
12
Time complexity

Again we have the same reasoning as for the forward algorithm time complexity and end up with time complexity of $\Theta(nm^2)$.

Implementation

Throughout this implementation we will be validating against this wikipedia example.

First we gotta turn those data representations into structures which our implementation can work with:

wiki_emission_probs = np.array([[0.9, 0.1], [0.2, 0.8]])
wiki_initial_dist = np.array([[0.5, 0.5]])
wiki_observations = np.array([0, 0, 1, 0, 0])
wiki_transition_probs = np.array([[0.7, 0.3], [0.3, 0.7]])

S_wiki = np.arange(2)
T_wiki = wiki_transition_probs
X_wiki = wiki_observations
epsilon_wiki = lambda z, x: wiki_emission_probs[z][x]
pi_wiki = np.array([0.5, 0.5])
Forward
cnt = 0


def forward(pi, X, S, epsilon, T):
    n_obs = len(X)
    n_states = len(S)

    alpha = np.zeros((n_obs + 1, n_states))
    alpha[0] = pi

    for k_minus_one, x in enumerate(X):
        # readjust the index of alpha since
        # we're including pi too
        k = k_minus_one + 1

        alpha_k = alpha[k]

        for z_k in S:
            alpha_k[z_k] = sum(
                # p(z_k | z_{k-1}) * p(x_k | z_k) * alpha_{k-1}(z_{k-1})
                T[z, z_k] * epsilon(z_k, x) * alpha[k - 1][z]
                for z in S
            )

        # gotta normalize to get probab p(z_k, x_{1:k})
        alpha[k] = alpha_k / alpha_k.sum()

    return alpha

forward(pi_wiki, X_wiki, S_wiki, epsilon_wiki, T_wiki)
array([[ 0.5       ,  0.5       ],
       [ 0.81818182,  0.18181818],
       [ 0.88335704,  0.11664296],
       [ 0.19066794,  0.80933206],
       [ 0.730794  ,  0.269206  ],
       [ 0.86733889,  0.13266111]])

YAY!

Backward
cnt = 0


def backward(X, S, epsilon, T):
    n_obs = len(X)
    n_states = len(S)
    beta = np.zeros((n_obs + 1, n_states))  # 1 extra for initial state

    beta[0] = np.ones(*S.shape)
    beta[0] /= beta[0].sum()

    for k_plus_one in range(n_obs):  # n-1, ..., 1
        k = k_plus_one + 1               # iterating backwards yah know
        beta_k = beta[k]                       # just aliasing the pointers
        beta_k_plus_one = beta[k_plus_one]     # beta_{k+1}
        x_plus_one = X[k_plus_one]       # x_{k+1}

        for z_k in S:                    # for each val of z_k
            beta_k[z_k] = sum(
                # p(z_{k+1} | z_k) * p(X_{k+1} | z_{k+1}) * beta_{k+1}(z_{k+1})
                T[z_k, z] * epsilon(z, x_plus_one) * beta_k_plus_one[z]
                for z in S               # z_{k+1}
            )

            # validating the time complexity
            global cnt
            cnt += n_states

        # normalize the beta over the z's at each stage
        # since it represents p(x_{k+1:n} | z_k)
        beta_k = beta_k / beta_k.sum()

    # finally we gotta get the probabilities of the intial state
    x = X[0]
    for z_1 in S:
        beta[-1][z_1] = sum(T[z_1, z] * epsilon(z, x) * beta[-2][z] for z in S)

    # and normalize this too
    beta[-1] /= beta[-1].sum()

    # reversing the results so that we have 0..n
    return beta[::-1]

backward(X_wiki, S_wiki, epsilon_wiki, T_wiki)
array([[ 0.64693556,  0.35306444],
       [ 0.03305881,  0.02275383],
       [ 0.0453195 ,  0.0751255 ],
       [ 0.22965   ,  0.12185   ],
       [ 0.345     ,  0.205     ],
       [ 0.5       ,  0.5       ]])

Which is the wanted result! Great!

<2017-03-05 Sun> I find it interesting that the forward and backward algorithms provide me with similar, but slightly different answers when we run both of them independently on all of our observations.

BRAINFART! The forward algorithm aims to compute the joint distribution $p(z_n, \mathbf{X}_{1:n})$, while the backward algorithm computes the conditional $p(\mathbf{X}_{2:n} | z_1)$, in the case where we give the algorithms all the observations. They're not computing the same thing, duh.

And just to validate the time-complexity:

print "Want it to be about %d; it is in fact %d" % (len(X) * len(S_wiki), cnt)
Want it to be about 400; it is in fact 20
Forward-backward

Now for bringing it all together: forward-backward.

cnt = 0


def forward_backward(pi, X, S, epsilon, T):
    n_obs = len(X)
    n_states = len(S)

    posterior = np.zeros((n_obs + 1, n_states))

    forward_ = forward(pi, X, S, epsilon, T)
    backward_ = backward(X, S, epsilon, T)

    for k in range(n_obs + 1):
        posterior[k] = forward_[k] * backward_[k]
        posterior[k] /= posterior[k].sum()

    return posterior

forward_backward(pi_wiki, X_wiki, S_wiki, epsilon_wiki, T_wiki)
array([[ 0.64693556,  0.35306444],
       [ 0.86733889,  0.13266111],
       [ 0.82041905,  0.17958095],
       [ 0.30748358,  0.69251642],
       [ 0.82041905,  0.17958095],
       [ 0.86733889,  0.13266111]])

Eureka! We got the entire posterior distribution of the hidden states given the observations!

Discussion

<2017-02-07 Tue> Even though the plot looks nice and all, it seems weird to me that everyone seems to be talkin about issues with underflow while I'm getting overflow issues when dealing with large n o.O

I wasn't normalizing at each step, but instead waited until I had all the values and then normalized. Which means that I would get insanely large values of alpha, and thus doom.

I'm not 100% but I think it would still be a probability distribution in the end, if, you know, I could get to the end without overflow error…

Example

predicted = []
posterior = forward_backward(pi, X, S, epsilon, T)

for p in posterior:
    predicted.append(np.argmax(p))
plt.scatter(steps, X, s=25)
plt.scatter(steps, zs, s=10)
plt.scatter(steps, predicted[:-1], alpha=0.1, c="g")
plt.title("%d samples" % n)

test2.png

Here you can see the actual states plotted in blue, the normally distributed observations in red, and the predictions of the hidden states as a vague green. Yeah, I should probably mess with the sizes and colors to make it more apparent..

Viterbi algorithm

Allows us to compute the posterior probability of a sequence of states, $\mathbf{z} = (z_1, ..., z_n)$, given some sequence of observations, $X = (\mathbf{\mathbf{x_1}, ..., \mathbf{x_n}})$.

Notation

  • $\mathbf{X} = (\mathbf{x}_1, ..., \mathbf{x}_n)$ i.e. the sequence of observations made, not the rv representing some $\mathbf{x}$
  • $\mathbf{X}_{i:j}$ represents the sequence of observations from the $i^{\text{th}}$ observation to the $j^{\text{th}}$ observation

Algorithm

The main idea behind the Viterbi algorithm is to find the optimal / most probable sub-sequence at each step, and then find the optimal / most probable sub-sequence at the next step, using the previous steps optimal path.

The naive approach would be to simply obtain the probabilities for all the different sequences of states, $(z_1, ..., z_n)$ given the sequence of observations $(\mathbf{x}_1, ..., \mathbf{x}_n)$. The issue with this approach is quite clearly the number of possible combinations, which would be $n^m$, where $m$ is the number of possible states.

A "dynamic programming" approach would be to attept to find a more feasible sub-problem and check if we can recursively solve these sub-problems, reducing computation by storing intermediate solutions as we go.

Viterbi_animated_demo.gif

Wikipedia

So, let's start with the most likely intial path:

$V_1(z) = \underset{z \in S}{\text{argmax }} \Big( p(\mathbf{x_1} | z) \cdot \pi_z \Big)$

where

  • $\pi_z = P(z_1 = z)$, i.e. probability of first state taking value $z$, where $z \in S$

Our next step is then to define $\underset{z \in S}{\text{argmax }} P(z_2 = z)$ in terms of

The most likely state sequence $(z_1, ..., z_2)$ is given by the recurrence relations:

\begin{equation*}
\begin{split}
V_{k, z} &amp;= \underset{z_k \in S}{\text{max}} \Big( P(z_k | z) \cdot T(x_k | z) \cdot V_{k-1, z_k} \Big) \\
V_{1, z} &amp;= P(x_1 | z) \cdot \pi_{z}
\end{split}
\end{equation*}
13

where

  • $P(\mathbf{x}_1, ..., \mathbf{x}_n, z_1, ... , z_n)$ - probability of ???
  • $V_{k, z} = \underset{z \in S}{\text{max }} P(z_1, ..., z_{k} | z_n = z, \mathbf{x}_1, ..., \mathbf{x}_k)$, i.e. probability of most probable sequence of the first $k$ elements, given that the (entire?) path ends in state $z$.

My interpretation

Viterbi algorithm is all about making locally optimal choices, storing this locally optimal sub-paths as you go, and reuse this computation when trying different choices of states.

You start out by the following:

  1. Choose the initial state $z_1$ s.t. you maximize the probability of observing $\mathbf{x}_1$, i.e.

    \begin{equation*}
\begin{split}
     \hat{z_1} &amp;= \underset{z}{\text{argmax }} P( \mathbf{x}_1, z_1 = z) \\
     &amp;= \underset{z}{\text{argmax }} P(z_1 = z) P(\mathbf{x}_1 | z_1 = z)
\end{split}
\end{equation*}
14

    Now to the crucial part (and the part which makes this a dynamic programming algorithm!), we store both the probability of the sequence we've chosen so far and the states we've chosen. In this case, it's simply $\hat{z}_1$ and $P(z_1 = \hat{z}_1 | \mathbf{x}_1)$.

  2. Choose the next state $z_2$ s.t. you maximize the probability of observing $\mathbf{x}_2$, conditioned on already being in state $\hat{z}_1$, i.e.

    \begin{equation*}
\begin{split}
\hat{z}_2 &amp;= \underset{z}{\text{argmax }} P(x_2, z_2 = z | x_1, z_1 = \hat{z}_1) \\
&amp;= \underset{z}{\text{argmax }} P(z_2 = z | z_1 = \hat{z}_1) P(\mathbf{x}_2 | z_2 = z)
\end{split}
\end{equation*}
15

    And again, we store the current path and it's probability, i.e. $(\hat{z}_1, \hat{z}_2)$ and $P(\hat{z}_2, \hat{z}_1 | \mathbf{x}_1, \mathbf{x}_2)$. Important: We store this somewhere else than the first sub-path obtained, i.e. $(\hat{z}_1)$ and $P(\hat{z}_1 = z_1 | \mathbf{x}_1)$.

  3. Perform this step until you reach the final step/observation, i.e. $n$ times and you end up in $\hat{z}_n$.

Pretty complex, ehh? Simply choosing what seems the best at each step you take.

Now, let's think about the number of computations we got to do here.

  • At each step we got to decide between $m$ possible states, computing the probability $P(\mathbf{x}_k | z_k = z)P(z_k = z | z_{k-1} = \hat{z}_{k-1})$ for each one of them. This is required so that we know which one to choose for our next state. That is; at each step we do $m$ computations.
  • We make the above choice until we reach the end, and so we this is $n$ steps.

This leads us to a complexity of $\Theta (mn)$, where

  • $m$ is number of possible states
  • $n$ is number of steps/observations

Unfortunately we're not done yet. We've performed $m*n$ operations, but what if the initial probability of the state we chose first, $\hat{z}_1$, has "horrible" probabilites as soon as you get past the first step?! Maybe if we set $z_1$ equal to some other value, we would get a much better probability in the end?

Hence, smart as we are, we go "Okay, let's try again, but this time we choose the second best initial state!". And because we're running out of notation, we call this $\hat{z}_1^{(2)}$. Yeah I know, it's pretty bad, but you get the picture. The $(2)$ denotes that this is the 2nd try until we've either realized that it's worse than our current $\hat{z}_1$, in which case we discard the new value, or we realize it's in fact better, and so we simply replace it, i.e. set $\hat{z}_1 = \hat{z}_1^{(2)}$, also replacing the probability.

But how do we decide that the new suggested value is better? By walking through the path again, of course! But, this time we're going to try and be smart.

  • If we at some step $t$ reach a node that we visited in our first "walk", we don't have to keep going until the end, but instead we terminate and do the following:
    • Compare the probability of the sub-sequence up until this step that we're currently looking at, with the previous best estimate for this same sub-sequnce, i.e. compare $P(\hat{z}_1^{(2)}, ..., \hat{z}_{t-1}^{(2)}, \hat{z}_t^{(2)}, \hat{z}_{t+1}, ..., \hat{z}_n | \mathbf{X}_{1:n})$ to $P(\hat{z}_1, ..., \hat{z}_{t-1}, \hat{z}_t, \hat{z}_{t+1}, ..., \hat{z}_n | \mathbf{X}_{1:n})$.
    • If the new sub-path is more probable, then we replace our current best estimate for this sub-path, and recompute the joint conditional distribution for the rest of the path.

      • Simply divide away our previous estimate, and multiply with

      new and better sub-path probability:

          \begin{equation*}
    P(\hat{z}_1^{(2)}, ..., \hat{z}_{t-1}^{(2)}, \hat{z}_t^{(2)} | \mathbf{X}_{1:n})
\frac{P(\hat{z}_1, ..., \hat{z}_{t-1}, \hat{z}_t, \hat{z}_{t+1}, ..., \hat{z}_n | \mathbf{X}_{1:n})}{P(\hat{z}_1, ..., \hat{z}_{t-1}, \hat{z}_t | \mathbf{X}_{1:n})}
\end{equation*}
16

For the sake of clarification, when we use the word sub-path, we're talking about a sequence of states $(z_1, ..., z_k)$ for some $k \leq n$, i.e. all sub-paths start at step $1$, ending at some $k$. We're not talking about sequences starting at any arbitrary $i$.

We repeat the process above for all possible initial states, which gives us another factor of $m$ in the time-complexity. This is due to having to perform the (possibly, see below) $nm$ number of operations we talked about earlier for each initial state, for which there are $m$ different ones.

This means that, at worst, we will not reach any already known / visited sequence, and so will do all the $n$ steps for each possible initial state. For example, consider the case of every state only being able to transition to itself. In such cases we will have a time complexity of $\Theta (n m^2)$ which is still substantially better than $O (m^n)$, the naive approach. But as I said, this is worst case scenario! More often we will find sub-sequences that we already have computed, and so we won't have to do all the $n$ steps for every initial state!

Thus, for the Viterbi algorithm we get a time complexity of $O (nm^2)$ (note Big-O, not Big-Theta)!

Implementation

First we need to set up the memoization part of the algorithm. Between each "attempt" we need to remember the following:

  • At each step $k$, the probability of most the likely sub-path. We'll store as an array best´╗┐_sequence, with the the $i^{th}$ element pointing to an array of length $i$, holding the sequence/path of states for our best guess so far.
  • At each step $k$, the state which was chosen by the most likely sub-path. We'll store this in an array best_sequence, with the $i^{th}$ element being the probability of the sequence/path stored at best_sequence_probas[i].
n_obs = len(X_wiki)
n_states = len(S_wiki)

best_sequence = [None] * n_obs
best_sequence_probas = np.zeros(n_obs)

n_obs, n_states
5 2

First we choose the "best" initial state as our starting point, and then start stepping through. Best initial state corresponds to maximize the $P(z_1 | x_1) \propto P(x_1 | z_1) P(z_1)$ over $z_1$.

Notice how we're not actually computing real probabilities as we're not normalizing. The reason for this is simply that it is not necessary to obtain the most likely sequence.

x_1 = X_wiki[0]

z_hat = S[0]
z_hat_proba = epsilon_wiki(S[0], x_1) * pi_wiki[S[0]]

initial_step_norm_factor = z_hat_proba

for z in S_wiki[1:]:
    z_proba = epsilon_wiki(z, x_1) * pi_wiki[z]
    initial_step_norm_factor += z_proba

    if z_proba > z_hat_proba:
        z_hat = z
        z_hat_proba = z_proba

best_sequence[0] = 1
best_sequence_probas[0] = epsilon_wiki(1, x_1) * pi_wiki[1] / initial_step_norm_factor

best_sequence[0], best_sequence_probas[0]
1 0.18181818181818182
for i, x in enumerate(X_wiki):
    if i == 0:
        continue

    prev_sequence = best_sequence[i - 1]
    z_hat_prev = best_sequence[i - 1]
    z_hat_prev_proba = best_sequence_probas[i - 1]

    # initialize
    z_hat = S_wiki[0]
    z_hat_proba = T_wiki[z_hat_prev, z_hat] * epsilon_wiki(z_hat, x)

    step_norm_factor = z_hat_proba

    for z in S_wiki[1:]:
        z_proba = T_wiki[z_hat_prev, z] * epsilon_wiki(z, x)
        step_norm_factor += z_proba

        if z_proba > z_hat_proba:
            z_hat = z
            z_hat_proba = z_proba

    best_sequence[i] = z_hat
    best_sequence_probas[i] = z_hat_proba * z_hat_prev_proba / step_norm_factor

best_sequence, best_sequence_probas
(1 0 1 0 0) array ((0.18181818 0.11973392 0.09269723 0.06104452 0.0557363))

When I started doing this, I wanted to save the sub-path at each step. Then I realized that this was in fact not necessary, but we instead could simply store the state $\hat{z}_k$ at step $k$ and the probability of the previous best estimate for the sub-path being in state $\hat{z}_k$ at step $k$. When we're finished, we're left with a sequence $\mathbf{\hat{z}} = (\hat{z}_1, ..., \hat{z}_n)$ which is the best estimate.

If you don't remember; what we did previously was storing the entire sub-path up until the current step, i.e. at step $k$ we stored the sequence $(\hat{z}_1, ..., \hat{z}_{k-1}, \hat{z}_k)$, which is bloody unecessary. This would allow us to afterwards obtain the best estimate for any path though, so that would be nice. But whenever we would hit a better estimate for a previously obtained path, we would have to step through each $k-1$ element for the stored sub-paths and replace them.

That was our first run, now we gotta give it a shot for the second best one. Since, as mentioned before, we've assumed a uniform prior, we simply pick the next state.

curr_seq = [0]
curr_seq_probas = [epsilon_wiki(curr_seq[0], x_1) * pi[curr_seq[0]] / initial_step_norm_factor]

for i, x in enumerate(X_wiki):
    if i == 0:
        continue

    z_prev = curr_seq[i - 1]
    z_prev_proba = curr_seq_probas[i - 1]

    # initialize the argmax process
    z_next = S_wiki[0]
    z_next_proba = T_wiki[z_prev, z_next] * epsilon_wiki(z_next, x)

    step_norm_factor = z_next_proba

    for z in S_wiki[1:]:
        z_proba = T_wiki[z_prev, z] * epsilon_wiki(z, x)
        step_norm_factor += z_proba

        if z_proba > z_next_proba:
            z_next = z
            z_next_proba = z_proba

    curr_seq_proba = curr_seq_probas[i - 1] * z_next_proba / step_norm_factor
    curr_seq.append(z_next)
    curr_seq_probas.append(curr_seq_proba)

    # check if the rest of the path has been
    # computed already
    z_hat = best_sequence[i]
    if z_hat == z_next:
        # we've computed this before, and the rest
        # of the path is the best estimate we have
        # as of right now
        best_seq_proba = best_sequence_probas[i]
        if best_seq_proba < curr_seq_proba:
            # the new path is better, so we gotta
            # replace the best estimate
            best_sequence[:i + 1] = curr_seq
            # and the preceding probabilities
            best_sequence_probas[:i + 1] = curr_seq_probas

            # then we have to recompute the probabilities
            # for the rest of the best estimate path
            for j in range(i + 1, n_obs):
                best_sequence_probas[j] = curr_seq_proba * \
                         best_sequence_probas[j] \
                         / best_seq_proba

            print "We found a better one! Current is %f, new is %f" \
                % (best_seq_proba, curr_seq_proba)

        # since we've already computed the sub-path after this step
        # given the state we are currently in, we terminate
        break

curr_seq, curr_seq_probas
0 0
0.8181818181818181 0.7470355731225296
best_sequence, best_sequence_probas
(0 0 1 0 0) array ((0.81818182 0.74703557 0.57835012 0.38086471 0.34774604))

This looks very good to me!

def viterbi(pi, X, S, epsilon, T):
    n_obs = len(X)

    # initialize
    best_sequence = [None] * n_obs
    best_sequence_probas = np.zeros(n_obs)

    x_1 = X[0]
    initial_marginal = sum(epsilon(z, x_1) * pi[z] for z in S)

    sorted_S = sorted(S, key=lambda z: epsilon(z, x_1) * pi[z], reverse=True)

    for z_1 in sorted_S:
        curr_seq = [z_1]
        curr_seq_probas = [epsilon(z_1, x_1) * pi[z_1] / initial_marginal]

        for i, x in enumerate(X):
            if i == 0:
                continue

            z_prev = curr_seq[i - 1]
            z_prev_proba = curr_seq_probas[i - 1]

            # Max and argmax simultaenously
            # initialize the argmax process
            z_next = S[0]
            z_next_proba = T[z_prev, z_next] * epsilon(z_next, x)
            step_marginal = z_next_proba

            for z in S[1:]:
                z_proba = T[z_prev, z] * epsilon(z, x)

                step_marginal += z_proba

                if z_proba > z_next_proba:
                    z_next = z
                    z_next_proba = z_proba

            # Memoize our results
            curr_seq_proba = z_prev_proba * z_next_proba / step_marginal
            curr_seq.append(z_next)
            curr_seq_probas.append(curr_seq_proba)

            # check if the rest of the path has been computed already
            z_hat = best_sequence[i]
            if z_hat == z_next:
                # We've computed this before, and the rest
                # of the path is the best estimate we have.
                # Thus we check if this new path is better!
                best_seq_proba = best_sequence_probas[i]
                if best_seq_proba < curr_seq_proba:
                    # replace the best estimate sequence
                    best_sequence[:i + 1] = curr_seq
                    # and the corresponding probabilities
                    best_sequence_probas[:i + 1] = curr_seq_probas

                    # recompute the probabilities
                    # for the rest of the best estimate path
                    for j in range(i + 1, n_obs):
                        best_sequence_probas[j] = curr_seq_proba * \
                                 best_sequence_probas[j] \
                                 / best_seq_proba

                    print "We found a better one at step %d!" \
                                             "Current is %f, new is %f" \
                                             % (i, best_seq_proba, curr_seq_proba)

                # since we've already computed the sub-path after this step
                # given the state we are currently in, we terminate
                break

        # gone through all the observations for this state
        if i == n_obs - 1:
            best_seq_proba = best_sequence_probas[i]
            if best_seq_proba < curr_seq_proba:
                best_sequence = curr_seq
                best_sequence_probas = curr_seq_probas

    return best_sequence, best_sequence_probas
viterbi(pi_wiki, X_wiki, S_wiki, epsilon_wiki, T_wiki)
0 0 1 0 0
0.8181818181818181 0.7470355731225296 0.5783501211271196 0.3808647139129812 0.3477460431379394

Discussion

Using numpy.ndarray for curr_seq

I'm actually not sure whether or not this would speed up things. The way Python list work provides us with a ammortized constant time append method. This means that we can potentially save loads of memory compared to having to preallocate memory for n_obs elements using numpy.ndarray. Most of the time we won't be walking through the entire sequence of observations, and so most of the memory allocated won't even be used.

Nonetheless I haven't actually made any benchmarks, so it's all speculation. I just figured that it wasn't worth trying for the reason given here.

Python class implementation

Should this even be a class? I think it sort of makes sense, considering it's kind of a model of a state machine. And so modelling it in a imperative manner makes sense in my opinion.

Would it be a good idea to implement all of these as a staticmethod, and then have the instance specific method simply call the staticmethod? This would allow us to choose whether or not we actually want to create a full instance of a HiddenMarkovModel or not.

Naaah, we simply have functions which take those arguments and then have the class HiddenMarkovModel call those methods.

Methods:

  • forward
  • backward
  • forward-backward
  • viterbi

Properties:

  • posterior
  • epsilon and emission_probabilities
  • T and transition_probabilities
  • pi and initial_probabilities
  • S and states
  • X and observations
class HiddenMarkovModel:
    """
    A Hidden Markov Model (HMM) is fully defined by the
    state space, transition probabilities, emission probabilities,
    and initial probabilities of the states.

    This makes it easier to define a model, and then
    rerun it on different observations `X`.

    Parameters
    ----------
    pi: array-like floats, 1d
        Initial probability vector.
    S: array-like ints, 1d
        Defines the possible states the system can be in.
    epsilon: callable(z, x) -> float
        Takes current state :int:`z` and observation :float:`x`
        as argument, returning a :float: representing the 
        probability of seeing observation :float:`x` given
        the system is in state :int:`z`.
    T: array-like floats, 2d
        Matrix / 2-dimensional array where the index `[i][j]`
        gives us the probability of transitioning from state
        `i` to state `j`.
    X: array-like (optional)
        Represents the observations. This can be a array-like
        structure of basically anything, as long as :callable:`epsilon`
        can handle the elements of `X`. So observations can be
        scalar values, vectors, or whatever. 
    """
    def __init__(self, pi=None, S=None, epsilon=None, T=None, X=None):
        self.pi = pi
        self.S = S
        self.epsilon = epsilon
        self.T = T
        self.X = X

        # some aliases for easier use for others
        self.initial_probabilities = self.pi
        self.states = self.S
        self.emission_probabilities = self.epsilon
        self.transition_probabilities = self.T
        self.observations = self.X

        self.posterior = None
        self.optimal_path = None

    def forward(self, X=None):
        if X is None:
            X = self.X

        return forward(self.pi, X, self.S, self.epsilon, self.T)

    def backward(self, X=None):
        if X is None:
            X = self.X

        return backward(X, self.S, self.epsilon, self.T)

    def forward_backward(self, X=None):
        if X is None:
            X = self.X

        n_obs = len(X)
        n_states = len(S)

        posterior = np.zeros((n_obs + 1, n_states))

        forward_ = self.forward(X)
        backward_ = self.backward(X)

        for k in range(n_obs + 1):
            posterior[k] = forward_[k] * backward_[k]
            posterior[k] /= posterior[k].sum()

        # Should we actually set it, or leave that to the user?
        self.posterior = posterior

        return posterior

    def viterbi(self, X=None):
        if X is None:
            X = self.X

        self.optimal_path = viterbi(self.pi, X, self.S, self.epsilon, self.T)
        return self.optimal_path
hmm = HiddenMarkovModel(pi_wiki, S_wiki, epsilon_wiki, T_wiki)
hmm.viterbi(X_wiki)
hmm.forward_backward(X_wiki)

hmm.optimal_path
0 0 1 0 0
0.8181818181818181 0.7470355731225296 0.5783501211271196 0.3808647139129812 0.3477460431379394

Appendix A: Full code examples