开发者

Clean and type-safe state machine implementation in a statically typed language?

开发者 https://www.devze.com 2023-04-11 12:26 出处:网络
I implemented a simple state machine in Python: import time def a(): print \"a()\" return b def b(): print \"b()\"

I implemented a simple state machine in Python:

import time

def a():
    print "a()"
    return b

def b():
    print "b()"
    return c

def c():
    print "c()"
    return a


if __name__ == "__main__":
    state = a
    while True:
        state = state()
        time.sleep(1)

I wanted to port it to C, because it wasn't fast enough. But C doesn't let me make a function that returns a function of the same type. I tried making the function of this type: typedef *fn(fn)(), but it doesn't work, so I had to use a structure instead. Now the code is very ugly!

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

typedef struct fn {
    struct fn (*f)(void);
} fn_t;

fn_t a(void);
fn_t b(void);
fn_t c(void);

fn_t a(void)
{
    fn_t f = {b};

    (void)printf("a()\n");

    return f;
}

fn_t b(void)
{
    fn_t f = {c};

    (void)printf("b()\n");

    return f;
}

fn_t c(void)
{
    fn_t f = {a};

    (void)printf("c()\n");

    return f;
}

int main(void)
{
    fn_t state = {a};

    for(;; (void)sleep(1)) state = state.f();

    return EXIT_SUCCESS;
}

So I figured it's a problem with C's broken type system. So I used a language with a real type system (Haskell), but the same problem happens. I can't just do something like:

type Fn = IO Fn
a :: Fn
a = print "a()" >> return b
b :: Fn
b = print "b()" >> return c
c :: Fn
c = print "c()" >> return a

I get the error, Cycle in type synonym declarations.

So I have to make some wrapper the same way I did for the C code like this:

import Control.Monad
import System.Posix

data Fn = Fn (IO Fn)

a :: IO Fn
a = print "a()" >> return (Fn b)

b :: IO Fn
b = print "b()" >> return (Fn c)

c :: IO Fn
c = print "c()" >> return (Fn a)

run = foldM (\(Fn f) () -> sleep 1 >> f) (Fn a) (repeat ())

Why is it so hard to make a state machine in a statically typed language? I have to make unnecessary overhead in statically typed languages as well. Dynamically typed languages don't have this problem. Is there an easier way to do it in a s开发者_C百科tatically typed language?


In Haskell, the idiom for this is just to go ahead and execute the next state:

type StateMachine = IO ()
a, b, c :: StateMachine
a = print "a()" >> b
b = print "b()" >> c
c = print "c()" >> a

You need not worry that this will overflow a stack or anything like that. If you insist on having states, then you should make the data type more explicit:

data PossibleStates = A | B | C
type StateMachine = PossibleStates -> IO PossibleStates
machine A = print "a()" >> return B
machine B = print "b()" >> return C
machine C = print "c()" >> return A

You can then get compiler warnings about any StateMachine that forgot some states.


If you use newtype instead of data, you don't incur any overhead. Also, you can wrap each state's function at the point of definition, so the expressions that use them don't have to:

import Control.Monad

newtype State = State { runState :: IO State }

a :: State
a = State $ print "a()" >> return b

b :: State
b = State $ print "b()" >> return c

c :: State
c = State $ print "c()" >> return a

runMachine :: State -> IO ()
runMachine s = runMachine =<< runState s

main = runMachine a

Edit: it struck me that runMachine has a more general form; a monadic version of iterate:

iterateM :: Monad m => (a -> m a) -> a -> m [a]
iterateM f a = do { b <- f a
                  ; as <- iterateM f b
                  ; return (a:as)
                  }

main = iterateM runState a

Edit: Hmm, iterateM causes a space-leak. Maybe iterateM_ would be better.

iterateM_ :: Monad m => (a -> m a) -> a -> m ()
iterateM_ f a = f a >>= iterateM_ f

main = iterateM_ runState a

Edit: If you want to thread some state through the state machine, you can use the same definition for State, but change the state functions to:

a :: Int -> State
a i = State $ do{ print $ "a(" ++ show i ++ ")"
                ; return $ b (i+1)
                }

b :: Int -> State
b i = State $ do{ print $ "b(" ++ show i ++ ")"
                ; return $ c (i+1)
                }

c :: Int -> State
c i = State $ do{ print $ "c(" ++ show i ++ ")"
                ; return $ a (i+1)
                }

main = iterateM_ runState $ a 1


The problem with your Haskell code is, that type only introduces a synonym, which is quite similar to what typedef in C does. One important restriction is, that the expansion of the type must be finite, you can't give a finite expansion of your state machine. A solution is using a newtype: A newtype is a wrapper that does only exist for the type checker; there is absolutely zero overhead (excluded stuff that occurs because of generalization that can't be removed). Here is your signature; it typechecks:

newtype FN = FN { unFM :: (IO FN) }

Please note, that whenever you want to use an FN, you have to unpack it first using unFN. Whenever you return a new function, use FN to pack it.


In the C-like type systems functions are not first order citizens. There are certain restrictions on handling them. That was a decision for simplicity and speed of implementation/execution that stuck. To have functions behave like objects, one generally requires support for closures. Those however are not naturally supported by mosts processors' instruction sets. As C was designed to be close to the metal, there was no support for them.

When declaring recursive structures in C, the type must be fully expandable. A consequence of this is, that you can only have pointers as self-references in struct declarations:

struct rec;
struct rec {
    struct rec *next;
};

Also every identifier we use has to be declared. One of the restrictions of function-types is, that one can not forward declare them.

A state machine in C usually works by making a mapping from integers to functions, either in a switch statement or in a jump table:

typedef int (*func_t)();

void run() {
    func_t table[] = {a, b, c};

    int state = 0;

    while(True) {
        state = table[state]();
    }
}

Alternatively you could profile your Python code and try to find out why your code is slow. You can port the critical parts to C/C++ and keep using Python for the state machine.


As usual, despite the great answers already present, I couldn't resist trying it out for myself. One thing that bothered me about what is presented is that it ignores input. State machines--the ones that I am familiar with--choose between various possible transitions based on input.

data State vocab = State { stateId :: String
                         , possibleInputs :: [vocab]
                         , _runTrans :: (vocab -> State vocab)
                         }
                      | GoalState { stateId :: String }

instance Show (State a) where
  show = stateId

runTransition :: Eq vocab => State vocab -> vocab -> Maybe (State vocab)
runTransition (GoalState id) _                   = Nothing
runTransition s x | x `notElem` possibleInputs s = Nothing
                  | otherwise                    = Just (_runTrans s x)

Here I define a type State, which is parameterized by a vocabulary type vocab. Now let's define a way that we can trace the execution of a state machine by feeding it inputs.

traceMachine :: (Show vocab, Eq vocab) => State vocab -> [vocab] -> IO ()
traceMachine _ [] = putStrLn "End of input"
traceMachine s (x:xs) = do
  putStrLn "Current state: "
  print s
  putStrLn "Current input: "
  print x
  putStrLn "-----------------------"
  case runTransition s x of
    Nothing -> putStrLn "Invalid transition"
    Just s' -> case s' of
      goal@(GoalState _) -> do
        putStrLn "Goal state reached:"
        print s'
        putStrLn "Input remaining:"
        print xs
      _ -> traceMachine s' xs

Now let's try it out on a simple machine that ignores its inputs. Be warned: the format I have chosen is rather verbose. However, each function that follows can be viewed as a node in a state machine diagram, and I think you'll find the verbosity to be completely relevant to describing such a node. I've used stateId to encode in string format some visual information about how that state behaves.

data SimpleVocab = A | B | C deriving (Eq, Ord, Show, Enum)

simpleMachine :: State SimpleVocab
simpleMachine = stateA

stateA :: State SimpleVocab
stateA = State { stateId = "A state. * -> B"
               , possibleInputs = [A,B,C]
               , _runTrans = \_ -> stateB
               }

stateB :: State SimpleVocab
stateB = State { stateId = "B state. * -> C"
               , possibleInputs = [A,B,C]
               , _runTrans = \_ -> stateC
               }

stateC :: State SimpleVocab
stateC = State { stateId = "C state. * -> A"
               , possibleInputs = [A,B,C]
               , _runTrans = \_ -> stateA
               }

Since the inputs don't matter for this state machine, you can feed it anything.

ghci> traceMachine simpleMachine [A,A,A,A]

I won't include the output, which is also very verbose, but you can see it clearly moves from stateA to stateB to stateC and back to stateA again. Now let's make a slightly more complicated machine:

lessSimpleMachine :: State SimpleVocab
lessSimpleMachine = startNode

startNode :: State SimpleVocab
startNode = State { stateId = "Start node. A -> 1, C -> 2"
                  , possibleInputs = [A,C]
                  , _runTrans = startNodeTrans
                  }
  where startNodeTrans C = node2
        startNodeTrans A = node1

node1 :: State SimpleVocab
node1 = State { stateId = "node1. B -> start, A -> goal"
              , possibleInputs = [B, A]
              , _runTrans = node1trans
              }
  where node1trans B = startNode
        node1trans A = goalNode

node2 :: State SimpleVocab
node2 = State { stateId = "node2. C -> goal, A -> 1, B -> 2"
              , possibleInputs = [A,B,C]
              , _runTrans = node2trans
              }
  where node2trans A = node1
        node2trans B = node2
        node2trans C = goalNode

goalNode :: State SimpleVocab
goalNode = GoalState "Goal. :)"

The possible inputs and transitions for each node should require no further explanation, as they are verbosely described in the code. I'll let you play with traceMachine lessSipmleMachine inputs for yourself. See what happens when inputs is invalid (does not adhere to the "possible inputs" restrictions), or when you hit a goal node before the end of input.

I suppose the verbosity of my solution sort of fails what you were basically asking, which was to cut down on the cruft. But I think it also illustrates how descriptive Haskell code can be. Even though it is very verbose, it is also very straightforward in how it represents nodes of a state machine diagram.


Iit's not hard to make state machines in Haskell, once you realize that they are not monads! A state machine like the one you want is an arrow, an automaton arrow to be exact:

newtype State a b = State (a -> (b, State a b))

This is a function, which takes an input value and produces an output value along with a new version of itself. This is not a monad, because you cannot write join or (>>=) for it. Equivalently once you have turned this into an arrow you will realize that it's impossible to write an ArrowApply instance for it.

Here are the instances:

import Control.Arrow
import Control.Category
import Prelude hiding ((.), id)

instance Category State where
    id = State $ \x -> (x, id)

    State f . State g =
        State $ \x ->
            let (y, s2) = g x
                (z, s1) = f y
            in (z, s1 . s2)

instance Arrow State where
    arr f = let s = State $ \x -> (f x, s) in s
    first (State f) =
        State $ \(x1, x2) ->
            let (y1, s) = f x1
            in ((y1, x2), first s)

Have fun.


What you want is a recursive type. Different languages have different ways of doing this.

For example, in OCaml (a statically-typed language), there is an optional compiler/interpreter flag -rectypes that enables support for recursive types, allowing you to define stuff like this:

let rec a () = print_endline "a()"; b
and b () = print_endline "b()"; c
and c () = print_endline "c()"; a
;;

Although this is not "ugly" as you complained about in your C example, what happens underneath is still the same. The compiler simply worries about that for you instead of forcing you to write it out.

As others have pointed out, in Haskell you can use newtype and there won't be any "overhead". But you complain about having to explicitly wrap and unwrap the recursive type, which is "ugly". (Similarly with your C example; there is no "overhead" since at the machine level a 1-member struct is identical to its member, but it is "ugly".)

Another example I want to mention is Go (another statically-typed language). In Go, the type construct defines a new type. It is not a simple alias (like typedef in C or type in Haskell), but creates a full-fledged new type (like newtype in Haskell) because such a type has an independent "method set" of methods that you can define on it. Because of this, the type definition can be recursive:

type Fn func () Fn


You can get the same effect in C as in Python code,- just declare that functions return (void*):

#include "stdio.h"

typedef void* (*myFunc)(void);

void* a(void);
void* b(void);
void* c(void);

void* a(void) {
    printf("a()\n");
    return b;
}

void* b(void) {
    printf("b()\n");
    return c;
}

void* c(void) {
    printf("c()\n");
    return a;
}

void main() {
    void* state = a;
    while (1) {
        state = ((myFunc)state)();
        sleep(1);
    }
}


Your problem has been had before: Recursive declaration of function pointer in C

C++ operator overloading can be used to hide the mechanics of what is essentially the same as your your C and Haskell solutions, as Herb Sutter describes in GotW #57: Recursive Declarations.

struct FuncPtr_;
typedef FuncPtr_ (*FuncPtr)();

struct FuncPtr_
{
  FuncPtr_( FuncPtr pp ) : p( pp ) { }
  operator FuncPtr() { return p; }
  FuncPtr p;
};

FuncPtr_ f() { return f; } // natural return syntax

int main()
{
  FuncPtr p = f();  // natural usage syntax
  p();
}

But this business with functions will, in all likelihood, perform worse than the equivalent with numeric states. You should use a switch statement or a state table, because what you really want in this situation is a structured semantic equivalent to goto.


An example in F#:

type Cont = Cont of (unit -> Cont)

let rec a() =
    printfn "a()"
    Cont (fun () -> b 42)

and b n =
    printfn "b(%d)" n
    Cont c

and c() =
    printfn "c()"
    Cont a

let rec run (Cont f) =
    let f = f()
    run f

run (Cont a)

Regarding the question "why is it so hard to implement state machines using functions in statically typed languages?": That's because the type of of a and friends is a little bit weird: a function that when returns a function that returns a function that returns a function...

If I remove Cont from my example the F# compiler complains and says:

Expecting 'a but given unit -> 'a. The resulting type would be infinite when unifying 'a and unit -> 'a.

Another answer shows a solution in OCaml whose type inference is strong enough to remove the need for declaring Cont, which shows static typing is not to blame, rather the lack of powerful type inference in many statically typed languages.

I don't know why F# doesn't do it, I would guess maybe this would make the type inference algorithm more complicated, slower, or "too powerful" (it could manage to infer the type of incorrectly typed expressions, failing at a later point giving error messages that are hard to understand).

Note that the Python example you gave isn't really safe. In my example, b represents a family of states parameterized by an integer. In an untyped language, it's easy to make a mistake and return b or b 42 instead of the correct lambda and miss that mistake until the code is executed.


The Python code you posted will be transformed into a recursive function, but it will not be tail call optimized because Python has no tail call optimization, so it will stack overflow at some point. So the Python code is actually broken, and would take more work to get it as good as the Haskell or C versions.

Here is an example of what I mean:

so.py:

import threading
stack_size_bytes = 10**5
threading.stack_size(10**5)
machine_word_size = 4

def t1():
    print "start t1"
    n = stack_size_bytes/machine_word_size
    while n:
        n -= 1
    print "done t1"

def t2():
    print "start t2"
    n = stack_size_bytes/machine_word_size+1
    while n:
        n -= 1
    print "done t2"

if __name__ == "__main__":
    t = threading.Thread(target=t1)
    t.start()
    t.join()
    t = threading.Thread(target=t2)
    t.start()
    t.join()

shell:

$ python so.py
start t1
done t1
start t2
Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 530, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 483, in run
    self.__target(*self.__args, **self.__kwargs)
  File "so.py", line 18, in t2
    print "done t2"
RuntimeError: maximum recursion depth exceeded
0

精彩评论

暂无评论...
验证码 换一张
取 消

关注公众号